Skip to content

Commit

Permalink
feat(over window): WindowBuffer support for session window (#17067)
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc authored Jun 13, 2024
1 parent 2c6f7c3 commit a3da000
Show file tree
Hide file tree
Showing 6 changed files with 582 additions and 14 deletions.
9 changes: 9 additions & 0 deletions proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,7 @@ message WindowFrame {

TYPE_ROWS = 5;
TYPE_RANGE = 10;
TYPE_SESSION = 15;
}
enum BoundType {
BOUND_TYPE_UNSPECIFIED = 0;
Expand Down Expand Up @@ -497,6 +498,13 @@ message WindowFrame {
BoundType type = 1;
optional data.Datum offset = 3;
}
message SessionFrameBounds {
data.Datum gap = 1;

data.DataType order_data_type = 10;
common.OrderType order_type = 15;
data.DataType gap_data_type = 20;
}

Type type = 1;

Expand All @@ -508,6 +516,7 @@ message WindowFrame {
oneof bounds {
RowsFrameBounds rows = 10;
RangeFrameBounds range = 15;
SessionFrameBounds session = 20;
}
}

Expand Down
22 changes: 19 additions & 3 deletions src/expr/core/src/window_function/call.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ use risingwave_pb::expr::window_frame::{PbBounds, PbExclusion};
use risingwave_pb::expr::{PbWindowFrame, PbWindowFunction};
use FrameBound::{CurrentRow, Following, Preceding, UnboundedFollowing, UnboundedPreceding};

use super::{RangeFrameBounds, RowsFrameBound, RowsFrameBounds, WindowFuncKind};
use super::{
RangeFrameBounds, RowsFrameBound, RowsFrameBounds, SessionFrameBounds, WindowFuncKind,
};
use crate::aggregate::AggArgs;
use crate::Result;

Expand Down Expand Up @@ -100,6 +102,10 @@ impl Frame {
let bounds = must_match!(frame.get_bounds()?, PbBounds::Range(bounds) => bounds);
FrameBounds::Range(RangeFrameBounds::from_protobuf(bounds)?)
}
PbType::Session => {
let bounds = must_match!(frame.get_bounds()?, PbBounds::Session(bounds) => bounds);
FrameBounds::Session(SessionFrameBounds::from_protobuf(bounds)?)
}
};
let exclusion = FrameExclusion::from_protobuf(frame.get_exclusion()?)?;
Ok(Self { bounds, exclusion })
Expand All @@ -108,23 +114,29 @@ impl Frame {
pub fn to_protobuf(&self) -> PbWindowFrame {
use risingwave_pb::expr::window_frame::PbType;
let exclusion = self.exclusion.to_protobuf() as _;
#[expect(deprecated)] // because of `start` and `end` fields
match &self.bounds {
#[expect(deprecated)]
FrameBounds::Rows(bounds) => PbWindowFrame {
r#type: PbType::Rows as _,
start: None, // deprecated
end: None, // deprecated
exclusion,
bounds: Some(PbBounds::Rows(bounds.to_protobuf())),
},
#[expect(deprecated)]
FrameBounds::Range(bounds) => PbWindowFrame {
r#type: PbType::Range as _,
start: None, // deprecated
end: None, // deprecated
exclusion,
bounds: Some(PbBounds::Range(bounds.to_protobuf())),
},
FrameBounds::Session(bounds) => PbWindowFrame {
r#type: PbType::Session as _,
start: None, // deprecated
end: None, // deprecated
exclusion,
bounds: Some(PbBounds::Session(bounds.to_protobuf())),
},
}
}
}
Expand All @@ -135,27 +147,31 @@ pub enum FrameBounds {
Rows(RowsFrameBounds),
// Groups(GroupsFrameBounds),
Range(RangeFrameBounds),
Session(SessionFrameBounds),
}

impl FrameBounds {
pub fn validate(&self) -> Result<()> {
match self {
Self::Rows(bounds) => bounds.validate(),
Self::Range(bounds) => bounds.validate(),
Self::Session(bounds) => bounds.validate(),
}
}

pub fn start_is_unbounded(&self) -> bool {
match self {
Self::Rows(RowsFrameBounds { start, .. }) => start.is_unbounded_preceding(),
Self::Range(RangeFrameBounds { start, .. }) => start.is_unbounded_preceding(),
Self::Session(_) => false,
}
}

pub fn end_is_unbounded(&self) -> bool {
match self {
Self::Rows(RowsFrameBounds { end, .. }) => end.is_unbounded_following(),
Self::Range(RangeFrameBounds { end, .. }) => end.is_unbounded_following(),
Self::Session(_) => false,
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/expr/core/src/window_function/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ mod rows;
pub use rows::*;
mod range;
pub use range::*;
mod session;
pub use session::*;

mod state;
pub use state::*;
Expand Down
208 changes: 208 additions & 0 deletions src/expr/core/src/window_function/session.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::Display;
use std::ops::Deref;
use std::sync::Arc;

use anyhow::Context;
use educe::Educe;
use futures::FutureExt;
use risingwave_common::bail;
use risingwave_common::row::OwnedRow;
use risingwave_common::types::{
DataType, Datum, IsNegative, ScalarImpl, ScalarRefImpl, ToOwnedDatum, ToText,
};
use risingwave_common::util::sort_util::OrderType;
use risingwave_common::util::value_encoding::{DatumFromProtoExt, DatumToProtoExt};
use risingwave_pb::expr::window_frame::PbSessionFrameBounds;

use super::FrameBoundsImpl;
use crate::expr::{
build_func, BoxedExpression, Expression, ExpressionBoxExt, InputRefExpression,
LiteralExpression,
};
use crate::Result;

/// To implement Session Window in a similar way to Range Frame, we define a similar frame bounds
/// structure here. It's very like [`RangeFrameBounds`](super::RangeFrameBounds), but with a gap
/// instead of start & end offset.
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub struct SessionFrameBounds {
pub order_data_type: DataType,
pub order_type: OrderType,
pub gap_data_type: DataType,
pub gap: SessionFrameGap,
}

impl SessionFrameBounds {
pub(super) fn from_protobuf(bounds: &PbSessionFrameBounds) -> Result<Self> {
let order_data_type = DataType::from(bounds.get_order_data_type()?);
let order_type = OrderType::from_protobuf(bounds.get_order_type()?);
let gap_data_type = DataType::from(bounds.get_gap_data_type()?);
let gap_value = Datum::from_protobuf(bounds.get_gap()?, &gap_data_type)
.context("gap `Datum` is not decodable")?
.context("gap of session frame must be non-NULL")?;
let mut gap = SessionFrameGap::new(gap_value);
gap.prepare(&order_data_type, &gap_data_type)?;
Ok(Self {
order_data_type,
order_type,
gap_data_type,
gap,
})
}

pub(super) fn to_protobuf(&self) -> PbSessionFrameBounds {
PbSessionFrameBounds {
gap: Some(Some(self.gap.as_scalar_ref_impl()).to_protobuf()),
order_data_type: Some(self.order_data_type.to_protobuf()),
order_type: Some(self.order_type.to_protobuf()),
gap_data_type: Some(self.gap_data_type.to_protobuf()),
}
}
}

impl Display for SessionFrameBounds {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"SESSION WITH GAP {}",
self.gap.as_scalar_ref_impl().to_text()
)
}
}

impl FrameBoundsImpl for SessionFrameBounds {
fn validate(&self) -> Result<()> {
// TODO(rc): maybe can merge with `RangeFrameBounds::validate`

fn validate_non_negative(val: impl IsNegative + Display) -> Result<()> {
if val.is_negative() {
bail!("session gap should be non-negative, but {} is given", val);
}
Ok(())
}

match self.gap.as_scalar_ref_impl() {
ScalarRefImpl::Int16(val) => validate_non_negative(val)?,
ScalarRefImpl::Int32(val) => validate_non_negative(val)?,
ScalarRefImpl::Int64(val) => validate_non_negative(val)?,
ScalarRefImpl::Float32(val) => validate_non_negative(val)?,
ScalarRefImpl::Float64(val) => validate_non_negative(val)?,
ScalarRefImpl::Decimal(val) => validate_non_negative(val)?,
ScalarRefImpl::Interval(val) => {
if !val.is_never_negative() {
bail!(
"for session gap of type `interval`, each field should be non-negative, but {} is given",
val
);
}
if matches!(self.order_data_type, DataType::Timestamptz) {
// for `timestamptz`, we only support gap without `month` and `day` fields
if val.months() != 0 || val.days() != 0 {
bail!(
"for session order column of type `timestamptz`, gap should not have non-zero `month` and `day`",
);
}
}
}
_ => unreachable!(
"other order column data types are not supported and should be banned in frontend"
),
}
Ok(())
}
}

impl SessionFrameBounds {
pub fn minimal_next_start_of(&self, end_order_value: impl ToOwnedDatum) -> Datum {
self.gap.for_calc().minimal_next_start_of(end_order_value)
}
}

/// The wrapper type for [`ScalarImpl`] session gap, containing an expression to help adding the gap
/// to a given value.
#[derive(Debug, Clone, Educe)]
#[educe(PartialEq, Eq, Hash)]
pub struct SessionFrameGap {
/// The original gap value.
gap: ScalarImpl,
/// Built expression for `$0 + gap`.
#[educe(PartialEq(ignore), Hash(ignore))]
add_expr: Option<Arc<BoxedExpression>>,
}

impl Deref for SessionFrameGap {
type Target = ScalarImpl;

fn deref(&self) -> &Self::Target {
&self.gap
}
}

impl SessionFrameGap {
pub fn new(gap: ScalarImpl) -> Self {
Self {
gap,
add_expr: None,
}
}

fn prepare(&mut self, order_data_type: &DataType, gap_data_type: &DataType) -> Result<()> {
use risingwave_pb::expr::expr_node::PbType as PbExprType;

let input_expr = InputRefExpression::new(order_data_type.clone(), 0);
let gap_expr = LiteralExpression::new(gap_data_type.clone(), Some(self.gap.clone()));
self.add_expr = Some(Arc::new(build_func(
PbExprType::Add,
order_data_type.clone(),
vec![input_expr.clone().boxed(), gap_expr.clone().boxed()],
)?));
Ok(())
}

pub fn new_for_test(
gap: ScalarImpl,
order_data_type: &DataType,
gap_data_type: &DataType,
) -> Self {
let mut gap = Self::new(gap);
gap.prepare(order_data_type, gap_data_type).unwrap();
gap
}

fn for_calc(&self) -> SessionFrameGapRef<'_> {
SessionFrameGapRef {
add_expr: self.add_expr.as_ref().unwrap().as_ref(),
}
}
}

#[derive(Debug, Educe)]
#[educe(Clone, Copy)]
struct SessionFrameGapRef<'a> {
add_expr: &'a dyn Expression,
}

impl<'a> SessionFrameGapRef<'a> {
fn minimal_next_start_of(&self, end_order_value: impl ToOwnedDatum) -> Datum {
let row = OwnedRow::new(vec![end_order_value.to_owned_datum()]);
self.add_expr
.eval_row(&row)
.now_or_never()
.expect("frame bound calculation should finish immediately")
.expect("just simple calculation, should succeed") // TODO(rc): handle overflow
}
}
13 changes: 12 additions & 1 deletion src/expr/impl/src/window_function/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use risingwave_expr::window_function::{
use risingwave_expr::Result;
use smallvec::SmallVec;

use super::buffer::{RangeWindow, RowsWindow, WindowBuffer, WindowImpl};
use super::buffer::{RangeWindow, RowsWindow, SessionWindow, WindowBuffer, WindowImpl};

type StateValue = SmallVec<[Datum; 2]>;

Expand Down Expand Up @@ -99,6 +99,17 @@ pub(super) fn new(call: &WindowFuncCall) -> Result<BoxedWindowState> {
),
buffer_heap_size: KvSize::new(),
}) as BoxedWindowState,
FrameBounds::Session(frame_bounds) => Box::new(AggregateState {
agg_func,
agg_impl,
arg_data_types,
buffer: WindowBuffer::<SessionWindow<StateValue>>::new(
SessionWindow::new(frame_bounds.clone()),
call.frame.exclusion,
enable_delta,
),
buffer_heap_size: KvSize::new(),
}) as BoxedWindowState,
};
Ok(this)
}
Expand Down
Loading

0 comments on commit a3da000

Please sign in to comment.