Skip to content

Commit

Permalink
convert between rust type and pb
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed Jan 19, 2024
1 parent 51c6ff8 commit 570e5f3
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 11 deletions.
7 changes: 5 additions & 2 deletions proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -410,9 +410,9 @@ message AggCall {
message WindowFrame {
enum Type {
TYPE_UNSPECIFIED = 0;
// RANGE = 1;
TYPE_RANGE = 1;
TYPE_ROWS = 2;
// GROUPS = 3;
// TYPE_GROUPS = 3;
}
enum BoundType {
BOUND_TYPE_UNSPECIFIED = 0;
Expand Down Expand Up @@ -441,6 +441,9 @@ message WindowFrame {
Bound start = 2;
Bound end = 3;
Exclusion exclusion = 4;

// The data type of offset value in `RANGE` frame.
optional data.DataType offset_data_type = 10;
}

message WindowFunction {
Expand Down
94 changes: 88 additions & 6 deletions src/expr/core/src/window_function/call.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@

use std::fmt::Display;

use anyhow::Context;
use enum_as_inner::EnumAsInner;
use parse_display::Display;
use risingwave_common::bail;
use risingwave_common::types::{DataType, IsNegative, ScalarImpl, ScalarRefImpl, ToText};
use risingwave_common::types::{DataType, Datum, IsNegative, ScalarImpl, ScalarRefImpl, ToText};
use risingwave_common::util::value_encoding::{DatumFromProtoExt, DatumToProtoExt};
use risingwave_pb::expr::window_frame::{PbBound, PbExclusion};
use risingwave_pb::expr::{PbWindowFrame, PbWindowFunction};
use FrameBound::{CurrentRow, Following, Preceding, UnboundedFollowing, UnboundedPreceding};
Expand Down Expand Up @@ -85,13 +87,25 @@ impl Frame {
impl Frame {
pub fn from_protobuf(frame: &PbWindowFrame) -> Result<Self> {
use risingwave_pb::expr::window_frame::PbType;
let start = frame.get_start()?;
let end = frame.get_end()?;
let bounds = match frame.get_type()? {
PbType::Unspecified => bail!("unspecified type of `WindowFrame`"),
PbType::Rows => {
let start = FrameBound::from_protobuf(frame.get_start()?)?;
let end = FrameBound::from_protobuf(frame.get_end()?)?;
let start = FrameBound::<usize>::from_protobuf(start)?;
let end = FrameBound::<usize>::from_protobuf(end)?;
FrameBounds::Rows(RowsFrameBounds { start, end })
}
PbType::Range => {
let offset_data_type = DataType::from(frame.get_offset_data_type()?);
let start = FrameBound::<ScalarImpl>::from_protobuf(start, &offset_data_type)?;
let end = FrameBound::<ScalarImpl>::from_protobuf(end, &offset_data_type)?;
FrameBounds::Range(RangeFrameBounds {
offset_data_type,
start,
end,
})
}
};
let exclusion = FrameExclusion::from_protobuf(frame.get_exclusion()?)?;
Ok(Self { bounds, exclusion })
Expand All @@ -106,10 +120,19 @@ impl Frame {
start: Some(start.to_protobuf()),
end: Some(end.to_protobuf()),
exclusion,
offset_data_type: None,
},
FrameBounds::Range(RangeFrameBounds {
offset_data_type,
start,
end,
}) => PbWindowFrame {
r#type: PbType::Range as _,
start: Some(start.to_protobuf()),
end: Some(end.to_protobuf()),
exclusion,
offset_data_type: Some(offset_data_type.to_protobuf()),
},
FrameBounds::Range(_) => {
todo!("`RANGE` frame should be temporarily banned in `LogicalOverWindow`")
}
}
}
}
Expand Down Expand Up @@ -180,6 +203,7 @@ pub enum FrameBound<T> {

#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub struct RangeFrameBounds {
pub offset_data_type: DataType,
pub start: FrameBound<ScalarImpl>,
pub end: FrameBound<ScalarImpl>,
}
Expand Down Expand Up @@ -329,6 +353,64 @@ impl FrameBound<usize> {
}
}

impl FrameBound<ScalarImpl> {
pub fn from_protobuf(bound: &PbBound, offset_data_type: &DataType) -> Result<Self> {
use risingwave_pb::expr::window_frame::bound::PbOffset;
use risingwave_pb::expr::window_frame::PbBoundType;

let offset = bound.get_offset()?;
let bound = match offset {
PbOffset::Integer(_) => bail!("offset of `FrameBound<ScalarImpl>` must be `Datum`"),
PbOffset::Datum(offset) => match bound.get_type()? {
PbBoundType::Unspecified => bail!("unspecified type of `FrameBound<ScalarImpl>`"),
PbBoundType::UnboundedPreceding => Self::UnboundedPreceding,
PbBoundType::Preceding => Self::Preceding(
Datum::from_protobuf(offset, offset_data_type)
.context("offset `Datum` is not decodable")?
.context("offset of `FrameBound<ScalarImpl>` must be non-NULL")?,
),
PbBoundType::CurrentRow => Self::CurrentRow,
PbBoundType::Following => Self::Following(
Datum::from_protobuf(offset, offset_data_type)
.context("offset `Datum` is not decodable")?
.context("offset of `FrameBound<ScalarImpl>` must be non-NULL")?,
),
PbBoundType::UnboundedFollowing => Self::UnboundedFollowing,
},
};
Ok(bound)
}

pub fn to_protobuf(&self) -> PbBound {
use risingwave_pb::expr::window_frame::bound::PbOffset;
use risingwave_pb::expr::window_frame::PbBoundType;

let (r#type, offset) = match self {
Self::UnboundedPreceding => (
PbBoundType::UnboundedPreceding,
PbOffset::Datum(Default::default()),
),
Self::Preceding(offset) => (
PbBoundType::Preceding,
PbOffset::Datum(Some(offset).to_protobuf()),
),
Self::CurrentRow => (PbBoundType::CurrentRow, PbOffset::Datum(Default::default())),
Self::Following(offset) => (
PbBoundType::Following,
PbOffset::Datum(Some(offset).to_protobuf()),
),
Self::UnboundedFollowing => (
PbBoundType::UnboundedFollowing,
PbOffset::Datum(Default::default()),
),
};
PbBound {
r#type: r#type as _,
offset: Some(offset),
}
}
}

impl FrameBound<ScalarImpl> {
fn for_display(&self) -> FrameBound<String> {
match self {
Expand Down
10 changes: 7 additions & 3 deletions src/frontend/src/binder/expr/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -716,9 +716,13 @@ impl Binder {
let (start, end) = self.bind_window_frame_scalar_impl_bounds(
frame.start_bound,
frame.end_bound,
offset_data_type,
&offset_data_type,
)?;
FrameBounds::Range(RangeFrameBounds { start, end })
FrameBounds::Range(RangeFrameBounds {
start,
end,
offset_data_type,
})
}
WindowFrameUnits::Groups => {
bail_not_implemented!(
Expand Down Expand Up @@ -778,7 +782,7 @@ impl Binder {
&mut self,
start: WindowFrameBound,
end: Option<WindowFrameBound>,
offset_data_type: DataType,
offset_data_type: &DataType,
) -> Result<(FrameBound<ScalarImpl>, FrameBound<ScalarImpl>)> {
let mut convert_bound = |bound| -> Result<FrameBound<_>> {
Ok(match bound {
Expand Down

0 comments on commit 570e5f3

Please sign in to comment.