Skip to content

Commit

Permalink
just passed a quick manual e2e test
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>

add comment

Signed-off-by: Richard Chien <[email protected]>

order type little things

Signed-off-by: Richard Chien <[email protected]>

`fnd_affected_ranges` seems to work now

Signed-off-by: Richard Chien <[email protected]>

typo

Signed-off-by: Richard Chien <[email protected]>

typo

Signed-off-by: Richard Chien <[email protected]>

typo

Signed-off-by: Richard Chien <[email protected]>

simplify frame start/end calculation

Signed-off-by: Richard Chien <[email protected]>

minor

Signed-off-by: Richard Chien <[email protected]>

check range frame bounds

Signed-off-by: Richard Chien <[email protected]>

add comment

Signed-off-by: Richard Chien <[email protected]>

display RangeFrameBounds

Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed Jan 10, 2024
1 parent 3fad06a commit 9845051
Show file tree
Hide file tree
Showing 7 changed files with 1,177 additions and 27 deletions.
276 changes: 269 additions & 7 deletions src/expr/core/src/window_function/call.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ use std::fmt::Display;
use enum_as_inner::EnumAsInner;
use parse_display::Display;
use risingwave_common::bail;
use risingwave_common::types::DataType;
use risingwave_common::types::{
DataType, Datum, ScalarImpl, ScalarRefImpl, Sentinelled, ToDatumRef, ToOwnedDatum, ToText,
};
use risingwave_common::util::sort_util::{Direction, OrderType};
use risingwave_pb::expr::window_frame::{PbBound, PbExclusion};
use risingwave_pb::expr::{PbWindowFrame, PbWindowFunction};
use FrameBound::{CurrentRow, Following, Preceding, UnboundedFollowing, UnboundedPreceding};
Expand Down Expand Up @@ -107,34 +110,40 @@ impl Frame {
end: Some(end.to_protobuf()),
exclusion,
},
FrameBounds::Range(RangeFrameBounds { .. }) => {
todo!() // TODO()
}
}
}
}

#[derive(Display, Debug, Clone, Eq, PartialEq, Hash)]
#[derive(Display, Debug, Clone, Eq, PartialEq, Hash, EnumAsInner)]
#[display("{0}")]
pub enum FrameBounds {
Rows(RowsFrameBounds),
// Groups(GroupsFrameBounds),
// Range(RangeFrameBounds),
Range(RangeFrameBounds),
}

impl FrameBounds {
pub fn validate(&self) -> Result<()> {
match self {
Self::Rows(bounds) => bounds.validate(),
Self::Range(bounds) => bounds.validate(),
}
}

pub fn start_is_unbounded(&self) -> bool {
match self {
Self::Rows(RowsFrameBounds { start, .. }) => start.is_unbounded_preceding(),
Self::Range(RangeFrameBounds { start, .. }) => start.is_unbounded_preceding(),
}
}

pub fn end_is_unbounded(&self) -> bool {
match self {
Self::Rows(RowsFrameBounds { end, .. }) => end.is_unbounded_following(),
Self::Range(RangeFrameBounds { end, .. }) => end.is_unbounded_following(),
}
}

Expand All @@ -152,11 +161,150 @@ pub struct RowsFrameBounds {

impl RowsFrameBounds {
fn validate(&self) -> Result<()> {
FrameBound::validate_bounds(&self.start, &self.end)
FrameBound::validate_bounds(&self.start, &self.end, |_| Ok(()))
}
}

#[derive(Display, Debug, Clone, Eq, PartialEq, Hash, EnumAsInner)]
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub struct RangeFrameBounds {
pub start: FrameBound<ScalarImpl>,
pub end: FrameBound<ScalarImpl>,
}

impl Display for RangeFrameBounds {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"RANGE BETWEEN {} AND {}",
self.start.for_display(),
self.end.for_display()
)?;
Ok(())
}
}

impl RangeFrameBounds {
fn validate(&self) -> Result<()> {
FrameBound::validate_bounds(&self.start, &self.end, |offset| {
match offset.as_scalar_ref_impl() {
// TODO(): use decl macro to merge with the following
ScalarRefImpl::Int16(val) if val < 0 => {
bail!("frame bound offset should be non-negative, but {} is given", val);
}
ScalarRefImpl::Int32(val) if val < 0 => {
bail!("frame bound offset should be non-negative, but {} is given", val);
}
ScalarRefImpl::Int64(val) if val < 0 => {
bail!("frame bound offset should be non-negative, but {} is given", val);
}
// TODO(): datetime types
_ => unreachable!("other order column data types are not supported and should be banned in frontend"),
}
})
}

/// Get the frame start for a given order column value.
///
/// ## Examples
///
/// For the following frames:
///
/// ```sql
/// ORDER BY x ASC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
/// ORDER BY x DESC RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
/// ```
///
/// For any CURRENT ROW with any order value, the frame start is always the first-most row, which is
/// represented by [`Sentinelled::Smallest`].
///
/// For the following frame:
///
/// ```sql
/// ORDER BY x ASC RANGE BETWEEN 10 PRECEDING AND CURRENT ROW
/// ```
///
/// For CURRENT ROW with order value `100`, the frame start is the **FIRST** row with order value `90`.
///
/// For the following frame:
///
/// ```sql
/// ORDER BY x DESC RANGE BETWEEN 10 PRECEDING AND CURRENT ROW
/// ```
///
/// For CURRENT ROW with order value `100`, the frame start is the **FIRST** row with order value `110`.
pub fn frame_start_of(
&self,
order_value: impl ToDatumRef,
order_type: OrderType,
) -> Sentinelled<Datum> {
self.start.as_ref().bound_of(order_value, order_type)
}

/// Get the frame end for a given order column value. It's very similar to `frame_start_of`, just with
/// everything on the other direction.
pub fn frame_end_of(
&self,
order_value: impl ToDatumRef,
order_type: OrderType,
) -> Sentinelled<Datum> {
self.end.as_ref().bound_of(order_value, order_type)
}

/// Get the order value of the CURRENT ROW of the first-most frame that includes the given order value.
///
/// ## Examples
///
/// For the following frames:
///
/// ```sql
/// ORDER BY x ASC RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING
/// ORDER BY x DESC RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING
/// ```
///
/// For any given order value, the first CURRENT ROW is always the first-most row, which is
/// represented by [`Sentinelled::Smallest`].
///
/// For the following frame:
///
/// ```sql
/// ORDER BY x ASC RANGE BETWEEN CURRENT ROW AND 10 FOLLOWING
/// ```
///
/// For a given order value `100`, the first CURRENT ROW should have order value `90`.
///
/// For the following frame:
///
/// ```sql
/// ORDER BY x DESC RANGE BETWEEN CURRENT ROW AND 10 FOLLOWING
/// ```
///
/// For a given order value `100`, the first CURRENT ROW should have order value `110`.
pub fn first_curr_of(
&self,
order_value: impl ToDatumRef,
order_type: OrderType,
) -> Sentinelled<Datum> {
self.end
.as_ref()
.reverse()
.bound_of(order_value, order_type)
}

/// Get the order value of the CURRENT ROW of the last-most frame that includes the given order value.
/// It's very similar to `first_curr_of`, just with everything on the other direction.
pub fn last_curr_of(
&self,
order_value: impl ToDatumRef,
order_type: OrderType,
) -> Sentinelled<Datum> {
self.start
.as_ref()
.reverse()
.bound_of(order_value, order_type)
}
}

#[derive(Display, Debug, Clone, Copy, Eq, PartialEq, Hash, EnumAsInner)]
#[display(style = "TITLE CASE")]
pub enum FrameBound<T> {
UnboundedPreceding,
Expand All @@ -169,10 +317,23 @@ pub enum FrameBound<T> {
}

impl<T> FrameBound<T> {
fn validate_bounds(start: &Self, end: &Self) -> Result<()> {
fn offset_value(&self) -> Option<&T> {
match self {
UnboundedPreceding | UnboundedFollowing | CurrentRow => None,
Preceding(offset) | Following(offset) => Some(offset),
}
}

fn validate_bounds(
start: &Self,
end: &Self,
offset_checker: impl Fn(&T) -> Result<()>,
) -> Result<()> {
match (start, end) {
(_, UnboundedPreceding) => bail!("frame end cannot be UNBOUNDED PRECEDING"),
(UnboundedFollowing, _) => bail!("frame start cannot be UNBOUNDED FOLLOWING"),
(UnboundedFollowing, _) => {
bail!("frame start cannot be UNBOUNDED FOLLOWING")
}
(Following(_), CurrentRow) | (Following(_), Preceding(_)) => {
bail!("frame starting from following row cannot have preceding rows")
}
Expand All @@ -181,10 +342,32 @@ impl<T> FrameBound<T> {
}
_ => {}
}

for bound in [start, end] {
if let Some(offset) = bound.offset_value() {
offset_checker(offset)?;
}
}

Ok(())
}
}

impl<T> FrameBound<T>
where
FrameBound<T>: Copy,
{
fn reverse(self) -> FrameBound<T> {
match self {
UnboundedPreceding => UnboundedFollowing,
Preceding(offset) => Following(offset),
CurrentRow => CurrentRow,
Following(offset) => Preceding(offset),
UnboundedFollowing => UnboundedPreceding,
}
}
}

impl FrameBound<usize> {
pub fn from_protobuf(bound: &PbBound) -> Result<Self> {
use risingwave_pb::expr::window_frame::bound::PbOffset;
Expand Down Expand Up @@ -245,6 +428,85 @@ impl FrameBound<usize> {
}
}

impl FrameBound<ScalarImpl> {
fn as_ref(&self) -> FrameBound<ScalarRefImpl<'_>> {
match self {
UnboundedPreceding => UnboundedPreceding,
Preceding(offset) => Preceding(offset.as_scalar_ref_impl()),
CurrentRow => CurrentRow,
Following(offset) => Following(offset.as_scalar_ref_impl()),
UnboundedFollowing => UnboundedFollowing,
}
}

fn for_display(&self) -> FrameBound<String> {
match self {
UnboundedPreceding => UnboundedPreceding,
Preceding(offset) => Preceding(offset.as_scalar_ref_impl().to_text()),
CurrentRow => CurrentRow,
Following(offset) => Following(offset.as_scalar_ref_impl().to_text()),
UnboundedFollowing => UnboundedFollowing,
}
}
}

impl FrameBound<ScalarRefImpl<'_>> {
fn bound_of(self, order_value: impl ToDatumRef, order_type: OrderType) -> Sentinelled<Datum> {
let order_value = order_value.to_datum_ref();
match (self, order_type.direction()) {
(UnboundedPreceding, _) => Sentinelled::Smallest,
(UnboundedFollowing, _) => Sentinelled::Largest,
(CurrentRow, _) => Sentinelled::Normal(order_value.to_owned_datum()),
(Preceding(offset), Direction::Ascending)
| (Following(offset), Direction::Descending) => {
// should SUBTRACT the offset
if let Some(value) = order_value {
let res = match (value, offset) {
// TODO(): use decl macro to merge with the following
(ScalarRefImpl::Int16(val), ScalarRefImpl::Int16(off)) => {
ScalarImpl::Int16(val - off)
}
(ScalarRefImpl::Int32(val), ScalarRefImpl::Int32(off)) => {
ScalarImpl::Int32(val - off)
}
(ScalarRefImpl::Int64(val), ScalarRefImpl::Int64(off)) => {
ScalarImpl::Int64(val - off)
}
// TODO(): datetime types
_ => unreachable!("other order column data types are not supported and should be banned in frontend"),
};
Sentinelled::Normal(Some(res))
} else {
Sentinelled::Normal(None)
}
}
(Following(offset), Direction::Ascending)
| (Preceding(offset), Direction::Descending) => {
// should ADD the offset
if let Some(value) = order_value {
let res = match (value, offset) {
// TODO(): use decl macro to merge with the following
(ScalarRefImpl::Int16(val), ScalarRefImpl::Int16(off)) => {
ScalarImpl::Int16(val + off)
}
(ScalarRefImpl::Int32(val), ScalarRefImpl::Int32(off)) => {
ScalarImpl::Int32(val + off)
}
(ScalarRefImpl::Int64(val), ScalarRefImpl::Int64(off)) => {
ScalarImpl::Int64(val + off)
}
// TODO(): datetime types
_ => unreachable!("other order column data types are not supported and should be banned in frontend"),
};
Sentinelled::Normal(Some(res))
} else {
Sentinelled::Normal(None)
}
}
}
}
}

#[derive(Display, Debug, Copy, Clone, Eq, PartialEq, Hash, Default, EnumAsInner)]
#[display("EXCLUDE {}", style = "TITLE CASE")]
pub enum FrameExclusion {
Expand Down
Loading

0 comments on commit 9845051

Please sign in to comment.