diff --git a/proto/expr.proto b/proto/expr.proto index 9abb1d74f4955..602c712975ecd 100644 --- a/proto/expr.proto +++ b/proto/expr.proto @@ -452,6 +452,7 @@ message WindowFrame { TYPE_ROWS = 5; TYPE_RANGE = 10; + TYPE_SESSION = 15; } enum BoundType { BOUND_TYPE_UNSPECIFIED = 0; @@ -497,6 +498,13 @@ message WindowFrame { BoundType type = 1; optional data.Datum offset = 3; } + message SessionFrameBounds { + data.Datum gap = 1; + + data.DataType order_data_type = 10; + common.OrderType order_type = 15; + data.DataType gap_data_type = 20; + } Type type = 1; @@ -508,6 +516,7 @@ message WindowFrame { oneof bounds { RowsFrameBounds rows = 10; RangeFrameBounds range = 15; + SessionFrameBounds session = 20; } } diff --git a/src/expr/core/src/window_function/call.rs b/src/expr/core/src/window_function/call.rs index bf3178907ab7b..5c6f40ff25bc0 100644 --- a/src/expr/core/src/window_function/call.rs +++ b/src/expr/core/src/window_function/call.rs @@ -22,7 +22,9 @@ use risingwave_pb::expr::window_frame::{PbBounds, PbExclusion}; use risingwave_pb::expr::{PbWindowFrame, PbWindowFunction}; use FrameBound::{CurrentRow, Following, Preceding, UnboundedFollowing, UnboundedPreceding}; -use super::{RangeFrameBounds, RowsFrameBound, RowsFrameBounds, WindowFuncKind}; +use super::{ + RangeFrameBounds, RowsFrameBound, RowsFrameBounds, SessionFrameBounds, WindowFuncKind, +}; use crate::aggregate::AggArgs; use crate::Result; @@ -100,6 +102,10 @@ impl Frame { let bounds = must_match!(frame.get_bounds()?, PbBounds::Range(bounds) => bounds); FrameBounds::Range(RangeFrameBounds::from_protobuf(bounds)?) } + PbType::Session => { + let bounds = must_match!(frame.get_bounds()?, PbBounds::Session(bounds) => bounds); + FrameBounds::Session(SessionFrameBounds::from_protobuf(bounds)?) + } }; let exclusion = FrameExclusion::from_protobuf(frame.get_exclusion()?)?; Ok(Self { bounds, exclusion }) @@ -108,8 +114,8 @@ impl Frame { pub fn to_protobuf(&self) -> PbWindowFrame { use risingwave_pb::expr::window_frame::PbType; let exclusion = self.exclusion.to_protobuf() as _; + #[expect(deprecated)] // because of `start` and `end` fields match &self.bounds { - #[expect(deprecated)] FrameBounds::Rows(bounds) => PbWindowFrame { r#type: PbType::Rows as _, start: None, // deprecated @@ -117,7 +123,6 @@ impl Frame { exclusion, bounds: Some(PbBounds::Rows(bounds.to_protobuf())), }, - #[expect(deprecated)] FrameBounds::Range(bounds) => PbWindowFrame { r#type: PbType::Range as _, start: None, // deprecated @@ -125,6 +130,13 @@ impl Frame { exclusion, bounds: Some(PbBounds::Range(bounds.to_protobuf())), }, + FrameBounds::Session(bounds) => PbWindowFrame { + r#type: PbType::Session as _, + start: None, // deprecated + end: None, // deprecated + exclusion, + bounds: Some(PbBounds::Session(bounds.to_protobuf())), + }, } } } @@ -135,6 +147,7 @@ pub enum FrameBounds { Rows(RowsFrameBounds), // Groups(GroupsFrameBounds), Range(RangeFrameBounds), + Session(SessionFrameBounds), } impl FrameBounds { @@ -142,6 +155,7 @@ impl FrameBounds { match self { Self::Rows(bounds) => bounds.validate(), Self::Range(bounds) => bounds.validate(), + Self::Session(bounds) => bounds.validate(), } } @@ -149,6 +163,7 @@ impl FrameBounds { match self { Self::Rows(RowsFrameBounds { start, .. }) => start.is_unbounded_preceding(), Self::Range(RangeFrameBounds { start, .. }) => start.is_unbounded_preceding(), + Self::Session(_) => false, } } @@ -156,6 +171,7 @@ impl FrameBounds { match self { Self::Rows(RowsFrameBounds { end, .. }) => end.is_unbounded_following(), Self::Range(RangeFrameBounds { end, .. }) => end.is_unbounded_following(), + Self::Session(_) => false, } } diff --git a/src/expr/core/src/window_function/mod.rs b/src/expr/core/src/window_function/mod.rs index 5f747d2296729..6bbde8c8755e6 100644 --- a/src/expr/core/src/window_function/mod.rs +++ b/src/expr/core/src/window_function/mod.rs @@ -21,6 +21,8 @@ mod rows; pub use rows::*; mod range; pub use range::*; +mod session; +pub use session::*; mod state; pub use state::*; diff --git a/src/expr/core/src/window_function/session.rs b/src/expr/core/src/window_function/session.rs new file mode 100644 index 0000000000000..81a77058759bf --- /dev/null +++ b/src/expr/core/src/window_function/session.rs @@ -0,0 +1,208 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Display; +use std::ops::Deref; +use std::sync::Arc; + +use anyhow::Context; +use educe::Educe; +use futures::FutureExt; +use risingwave_common::bail; +use risingwave_common::row::OwnedRow; +use risingwave_common::types::{ + DataType, Datum, IsNegative, ScalarImpl, ScalarRefImpl, ToOwnedDatum, ToText, +}; +use risingwave_common::util::sort_util::OrderType; +use risingwave_common::util::value_encoding::{DatumFromProtoExt, DatumToProtoExt}; +use risingwave_pb::expr::window_frame::PbSessionFrameBounds; + +use super::FrameBoundsImpl; +use crate::expr::{ + build_func, BoxedExpression, Expression, ExpressionBoxExt, InputRefExpression, + LiteralExpression, +}; +use crate::Result; + +/// To implement Session Window in a similar way to Range Frame, we define a similar frame bounds +/// structure here. It's very like [`RangeFrameBounds`](super::RangeFrameBounds), but with a gap +/// instead of start & end offset. +#[derive(Debug, Clone, Eq, PartialEq, Hash)] +pub struct SessionFrameBounds { + pub order_data_type: DataType, + pub order_type: OrderType, + pub gap_data_type: DataType, + pub gap: SessionFrameGap, +} + +impl SessionFrameBounds { + pub(super) fn from_protobuf(bounds: &PbSessionFrameBounds) -> Result { + let order_data_type = DataType::from(bounds.get_order_data_type()?); + let order_type = OrderType::from_protobuf(bounds.get_order_type()?); + let gap_data_type = DataType::from(bounds.get_gap_data_type()?); + let gap_value = Datum::from_protobuf(bounds.get_gap()?, &gap_data_type) + .context("gap `Datum` is not decodable")? + .context("gap of session frame must be non-NULL")?; + let mut gap = SessionFrameGap::new(gap_value); + gap.prepare(&order_data_type, &gap_data_type)?; + Ok(Self { + order_data_type, + order_type, + gap_data_type, + gap, + }) + } + + pub(super) fn to_protobuf(&self) -> PbSessionFrameBounds { + PbSessionFrameBounds { + gap: Some(Some(self.gap.as_scalar_ref_impl()).to_protobuf()), + order_data_type: Some(self.order_data_type.to_protobuf()), + order_type: Some(self.order_type.to_protobuf()), + gap_data_type: Some(self.gap_data_type.to_protobuf()), + } + } +} + +impl Display for SessionFrameBounds { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "SESSION WITH GAP {}", + self.gap.as_scalar_ref_impl().to_text() + ) + } +} + +impl FrameBoundsImpl for SessionFrameBounds { + fn validate(&self) -> Result<()> { + // TODO(rc): maybe can merge with `RangeFrameBounds::validate` + + fn validate_non_negative(val: impl IsNegative + Display) -> Result<()> { + if val.is_negative() { + bail!("session gap should be non-negative, but {} is given", val); + } + Ok(()) + } + + match self.gap.as_scalar_ref_impl() { + ScalarRefImpl::Int16(val) => validate_non_negative(val)?, + ScalarRefImpl::Int32(val) => validate_non_negative(val)?, + ScalarRefImpl::Int64(val) => validate_non_negative(val)?, + ScalarRefImpl::Float32(val) => validate_non_negative(val)?, + ScalarRefImpl::Float64(val) => validate_non_negative(val)?, + ScalarRefImpl::Decimal(val) => validate_non_negative(val)?, + ScalarRefImpl::Interval(val) => { + if !val.is_never_negative() { + bail!( + "for session gap of type `interval`, each field should be non-negative, but {} is given", + val + ); + } + if matches!(self.order_data_type, DataType::Timestamptz) { + // for `timestamptz`, we only support gap without `month` and `day` fields + if val.months() != 0 || val.days() != 0 { + bail!( + "for session order column of type `timestamptz`, gap should not have non-zero `month` and `day`", + ); + } + } + } + _ => unreachable!( + "other order column data types are not supported and should be banned in frontend" + ), + } + Ok(()) + } +} + +impl SessionFrameBounds { + pub fn minimal_next_start_of(&self, end_order_value: impl ToOwnedDatum) -> Datum { + self.gap.for_calc().minimal_next_start_of(end_order_value) + } +} + +/// The wrapper type for [`ScalarImpl`] session gap, containing an expression to help adding the gap +/// to a given value. +#[derive(Debug, Clone, Educe)] +#[educe(PartialEq, Eq, Hash)] +pub struct SessionFrameGap { + /// The original gap value. + gap: ScalarImpl, + /// Built expression for `$0 + gap`. + #[educe(PartialEq(ignore), Hash(ignore))] + add_expr: Option>, +} + +impl Deref for SessionFrameGap { + type Target = ScalarImpl; + + fn deref(&self) -> &Self::Target { + &self.gap + } +} + +impl SessionFrameGap { + pub fn new(gap: ScalarImpl) -> Self { + Self { + gap, + add_expr: None, + } + } + + fn prepare(&mut self, order_data_type: &DataType, gap_data_type: &DataType) -> Result<()> { + use risingwave_pb::expr::expr_node::PbType as PbExprType; + + let input_expr = InputRefExpression::new(order_data_type.clone(), 0); + let gap_expr = LiteralExpression::new(gap_data_type.clone(), Some(self.gap.clone())); + self.add_expr = Some(Arc::new(build_func( + PbExprType::Add, + order_data_type.clone(), + vec![input_expr.clone().boxed(), gap_expr.clone().boxed()], + )?)); + Ok(()) + } + + pub fn new_for_test( + gap: ScalarImpl, + order_data_type: &DataType, + gap_data_type: &DataType, + ) -> Self { + let mut gap = Self::new(gap); + gap.prepare(order_data_type, gap_data_type).unwrap(); + gap + } + + fn for_calc(&self) -> SessionFrameGapRef<'_> { + SessionFrameGapRef { + add_expr: self.add_expr.as_ref().unwrap().as_ref(), + } + } +} + +#[derive(Debug, Educe)] +#[educe(Clone, Copy)] +struct SessionFrameGapRef<'a> { + add_expr: &'a dyn Expression, +} + +impl<'a> SessionFrameGapRef<'a> { + fn minimal_next_start_of(&self, end_order_value: impl ToOwnedDatum) -> Datum { + let row = OwnedRow::new(vec![end_order_value.to_owned_datum()]); + self.add_expr + .eval_row(&row) + .now_or_never() + .expect("frame bound calculation should finish immediately") + .expect("just simple calculation, should succeed") // TODO(rc): handle overflow + } +} diff --git a/src/expr/impl/src/window_function/aggregate.rs b/src/expr/impl/src/window_function/aggregate.rs index 7710a1b7adb2d..9942581357f77 100644 --- a/src/expr/impl/src/window_function/aggregate.rs +++ b/src/expr/impl/src/window_function/aggregate.rs @@ -31,7 +31,7 @@ use risingwave_expr::window_function::{ use risingwave_expr::Result; use smallvec::SmallVec; -use super::buffer::{RangeWindow, RowsWindow, WindowBuffer, WindowImpl}; +use super::buffer::{RangeWindow, RowsWindow, SessionWindow, WindowBuffer, WindowImpl}; type StateValue = SmallVec<[Datum; 2]>; @@ -99,6 +99,17 @@ pub(super) fn new(call: &WindowFuncCall) -> Result { ), buffer_heap_size: KvSize::new(), }) as BoxedWindowState, + FrameBounds::Session(frame_bounds) => Box::new(AggregateState { + agg_func, + agg_impl, + arg_data_types, + buffer: WindowBuffer::>::new( + SessionWindow::new(frame_bounds.clone()), + call.frame.exclusion, + enable_delta, + ), + buffer_heap_size: KvSize::new(), + }) as BoxedWindowState, }; Ok(this) } diff --git a/src/expr/impl/src/window_function/buffer.rs b/src/expr/impl/src/window_function/buffer.rs index 6c6277c20ee63..bd1c10d162b23 100644 --- a/src/expr/impl/src/window_function/buffer.rs +++ b/src/expr/impl/src/window_function/buffer.rs @@ -18,9 +18,9 @@ use std::ops::Range; use educe::Educe; use risingwave_common::array::Op; use risingwave_common::types::Sentinelled; -use risingwave_common::util::memcmp_encoding; +use risingwave_common::util::memcmp_encoding::{self, MemcmpEncoded}; use risingwave_expr::window_function::{ - FrameExclusion, RangeFrameBounds, RowsFrameBounds, StateKey, + FrameExclusion, RangeFrameBounds, RowsFrameBounds, SessionFrameBounds, StateKey, }; use super::range_utils::{range_diff, range_except}; @@ -148,7 +148,7 @@ impl WindowBuffer { let old_outer = self.curr_window_outer(); self.buffer.push_back(Entry { key, value }); - self.recalculate_left_right(); + self.recalculate_left_right(RecalculateHint::Append); if self.curr_delta.is_some() { self.maintain_delta(old_outer, self.curr_window_outer()); @@ -161,7 +161,7 @@ impl WindowBuffer { let old_outer = self.curr_window_outer(); self.curr_idx += 1; - self.recalculate_left_right(); + self.recalculate_left_right(RecalculateHint::Slide); if self.curr_delta.is_some() { self.maintain_delta(old_outer, self.curr_window_outer()); @@ -171,6 +171,9 @@ impl WindowBuffer { self.curr_idx -= min_needed_idx; self.left_idx -= min_needed_idx; self.right_excl_idx -= min_needed_idx; + + self.window_impl.shift_indices(min_needed_idx); + self.buffer .drain(0..min_needed_idx) .map(|Entry { key, value }| (key, value)) @@ -189,14 +192,14 @@ impl WindowBuffer { } } - fn recalculate_left_right(&mut self) { + fn recalculate_left_right(&mut self, hint: RecalculateHint) { let window = BufferRefMut { buffer: &self.buffer, curr_idx: &mut self.curr_idx, left_idx: &mut self.left_idx, right_excl_idx: &mut self.right_excl_idx, }; - self.window_impl.recalculate_left_right(window); + self.window_impl.recalculate_left_right(window, hint); } } @@ -218,6 +221,12 @@ pub(super) struct BufferRefMut<'a, K: Ord, V: Clone> { right_excl_idx: &'a mut usize, } +#[derive(Clone, Copy, PartialEq, Eq)] +pub(super) enum RecalculateHint { + Append, + Slide, +} + /// A trait for sliding window implementations. This trait is used by [`WindowBuffer`] to /// determine the status of current window and how to slide the window. pub(super) trait WindowImpl { @@ -233,8 +242,16 @@ pub(super) trait WindowImpl { fn following_saturated(&self, window: BufferRef<'_, Self::Key, Self::Value>) -> bool; /// Recalculate the left and right indices of the current window, according to the latest - /// `curr_idx`. The indices are indices in the buffer vector. - fn recalculate_left_right(&self, window: BufferRefMut<'_, Self::Key, Self::Value>); + /// `curr_idx` and the hint. + fn recalculate_left_right( + &mut self, + window: BufferRefMut<'_, Self::Key, Self::Value>, + hint: RecalculateHint, + ); + + /// Shift the indices stored by the [`WindowImpl`] by `n` positions. This should be called + /// after evicting rows from the buffer. + fn shift_indices(&mut self, n: usize); } /// The sliding window implementation for `ROWS` frames. @@ -301,7 +318,11 @@ impl WindowImpl for RowsWindow { } } - fn recalculate_left_right(&self, window: BufferRefMut<'_, Self::Key, Self::Value>) { + fn recalculate_left_right( + &mut self, + window: BufferRefMut<'_, Self::Key, Self::Value>, + _hint: RecalculateHint, + ) { if window.buffer.is_empty() { *window.left_idx = 0; *window.right_excl_idx = 0; @@ -333,6 +354,8 @@ impl WindowImpl for RowsWindow { *window.right_excl_idx = window.buffer.len(); } } + + fn shift_indices(&mut self, _n: usize) {} } /// The sliding window implementation for `RANGE` frames. @@ -377,7 +400,11 @@ impl WindowImpl for RangeWindow { } } - fn recalculate_left_right(&self, window: BufferRefMut<'_, Self::Key, Self::Value>) { + fn recalculate_left_right( + &mut self, + window: BufferRefMut<'_, Self::Key, Self::Value>, + _hint: RecalculateHint, + ) { if window.buffer.is_empty() { *window.left_idx = 0; *window.right_excl_idx = 0; @@ -433,14 +460,199 @@ impl WindowImpl for RangeWindow { Sentinelled::Smallest => unreachable!("frame end never be UNBOUNDED PRECEDING"), } } + + fn shift_indices(&mut self, _n: usize) {} +} + +pub(super) struct SessionWindow { + frame_bounds: SessionFrameBounds, + /// The latest session is the rightmost session in the buffer, which is updated during appending. + latest_session: Option, + /// The sizes of recognized but not consumed sessions in the buffer. It's updated during appending. + /// The first element, if any, should be the size of the "current session window". When sliding, + /// the front should be popped. + recognized_session_sizes: VecDeque, + _phantom: std::marker::PhantomData, +} + +#[derive(Debug)] +struct LatestSession { + /// The starting index of the latest session. + start_idx: usize, + + /// Minimal next start means the minimal order value that can start a new session. + /// If a row has an order value less than this, it should be in the current session. + minimal_next_start: MemcmpEncoded, +} + +impl SessionWindow { + pub fn new(frame_bounds: SessionFrameBounds) -> Self { + Self { + frame_bounds, + latest_session: None, + recognized_session_sizes: Default::default(), + _phantom: std::marker::PhantomData, + } + } +} + +impl WindowImpl for SessionWindow { + type Key = StateKey; + type Value = V; + + fn preceding_saturated(&self, window: BufferRef<'_, Self::Key, Self::Value>) -> bool { + window.curr_idx < window.buffer.len() && { + // XXX(rc): It seems that preceding saturation is not important, may remove later. + true + } + } + + fn following_saturated(&self, window: BufferRef<'_, Self::Key, Self::Value>) -> bool { + window.curr_idx < window.buffer.len() && { + // For session window, `left_idx` is always smaller than `right_excl_idx`. + assert!(window.left_idx <= window.curr_idx); + assert!(window.curr_idx < window.right_excl_idx); + + // The following expression checks whether the current window is the latest session. + // If it is, we don't think it's saturated because the next row may be still in the + // same session. Otherwise, we can safely say it's saturated. + self.latest_session + .as_ref() + .map_or(false, |LatestSession { start_idx, .. }| { + window.left_idx < *start_idx + }) + } + } + + fn recalculate_left_right( + &mut self, + window: BufferRefMut<'_, Self::Key, Self::Value>, + hint: RecalculateHint, + ) { + // Terms: + // - Session: A continuous range of rows among any two of which the difference of order values + // is less than the session gap. This is a concept on the whole stream. Sessions are recognized + // during appending. + // - Current window: The range of rows that are represented by the indices in `window`. It is a + // status of the `WindowBuffer`. If the current window happens to be the last session in the + // buffer, it will be updated during appending. Otherwise it will only be updated during sliding. + + match hint { + RecalculateHint::Append => { + assert!(!window.buffer.is_empty()); // because we just appended a row + let appended_idx = window.buffer.len() - 1; + let appended_key = &window.buffer[appended_idx].key; + + let minimal_next_start_of_appended = self.frame_bounds.minimal_next_start_of( + memcmp_encoding::decode_value( + &self.frame_bounds.order_data_type, + &appended_key.order_key, + self.frame_bounds.order_type, + ) + .expect("no reason to fail here because we just encoded it in memory"), + ); + let minimal_next_start_enc_of_appended = memcmp_encoding::encode_value( + minimal_next_start_of_appended, + self.frame_bounds.order_type, + ) + .expect("no reason to fail here"); + + if let Some(LatestSession { + ref start_idx, + minimal_next_start, + }) = self.latest_session.as_mut() + { + if &appended_key.order_key >= minimal_next_start { + // the appended row starts a new session + self.recognized_session_sizes + .push_back(appended_idx - start_idx); + self.latest_session = Some(LatestSession { + start_idx: appended_idx, + minimal_next_start: minimal_next_start_enc_of_appended, + }); + // no need to update the current window because it's now corresponding + // to some previous session + } else { + // the appended row belongs to the latest session + *minimal_next_start = minimal_next_start_enc_of_appended; + + if *start_idx == *window.left_idx { + // the current window is the latest session, we should extend it + *window.right_excl_idx = appended_idx + 1; + } + } + } else { + // no session yet, the current window should be empty + let left_idx = *window.left_idx; + let curr_idx = *window.curr_idx; + let old_right_excl_idx = *window.right_excl_idx; + assert_eq!(left_idx, curr_idx); + assert_eq!(left_idx, old_right_excl_idx); + assert_eq!(old_right_excl_idx, window.buffer.len() - 1); + + // now we put the first row into the current window + *window.right_excl_idx = window.buffer.len(); + + // and start to recognize the latest session + self.latest_session = Some(LatestSession { + start_idx: left_idx, + minimal_next_start: minimal_next_start_enc_of_appended, + }); + } + } + RecalculateHint::Slide => { + let old_left_idx = *window.left_idx; + let new_curr_idx = *window.curr_idx; + let old_right_excl_idx = *window.right_excl_idx; + + if new_curr_idx < old_right_excl_idx { + // the current row is still in the current session window, no need to slide + } else { + let old_session_size = self.recognized_session_sizes.pop_front(); + let next_session_size = self.recognized_session_sizes.front().copied(); + + if let Some(old_session_size) = old_session_size { + assert_eq!(old_session_size, old_right_excl_idx - old_left_idx); + + // slide the window to the next session + if let Some(next_session_size) = next_session_size { + // the next session is fully recognized, so we know the ending index + *window.left_idx = old_right_excl_idx; + *window.right_excl_idx = old_right_excl_idx + next_session_size; + } else { + // the next session is still in recognition, so we end the window at the end of buffer + *window.left_idx = old_right_excl_idx; + *window.right_excl_idx = window.buffer.len(); + } + } else { + // no recognized session yet, meaning the current window is the last session in the buffer + assert_eq!(old_right_excl_idx, window.buffer.len()); + *window.left_idx = old_right_excl_idx; + *window.right_excl_idx = old_right_excl_idx; + self.latest_session = None; + } + } + } + } + } + + fn shift_indices(&mut self, n: usize) { + if let Some(LatestSession { start_idx, .. }) = self.latest_session.as_mut() { + *start_idx -= n; + } + } } #[cfg(test)] mod tests { use itertools::Itertools; + use risingwave_common::row::OwnedRow; + use risingwave_common::types::{DataType, ScalarImpl}; + use risingwave_common::util::sort_util::OrderType; use risingwave_expr::window_function::FrameBound::{ CurrentRow, Following, Preceding, UnboundedFollowing, UnboundedPreceding, }; + use risingwave_expr::window_function::SessionFrameGap; use super::*; @@ -734,4 +946,114 @@ mod tests { vec!["hello"] ); } + + #[test] + fn test_session_frame() { + let order_data_type = DataType::Int64; + let order_type = OrderType::ascending(); + let gap_data_type = DataType::Int64; + + let mut buffer = WindowBuffer::>::new( + SessionWindow::new(SessionFrameBounds { + order_data_type: order_data_type.clone(), + order_type, + gap_data_type: gap_data_type.clone(), + gap: SessionFrameGap::new_for_test( + ScalarImpl::Int64(5), + &order_data_type, + &gap_data_type, + ), + }), + FrameExclusion::NoOthers, + true, + ); + + let key = |key: i64| -> StateKey { + StateKey { + order_key: memcmp_encoding::encode_value(&Some(ScalarImpl::from(key)), order_type) + .unwrap(), + pk: OwnedRow::empty().into(), + } + }; + + assert!(buffer.curr_key().is_none()); + + buffer.append(key(1), "hello"); + buffer.append(key(3), "session"); + let window = buffer.curr_window(); + assert_eq!(window.key, Some(&key(1))); + assert!(window.preceding_saturated); + assert!(!window.following_saturated); + assert_eq!( + buffer.curr_window_values().cloned().collect_vec(), + vec!["hello", "session"] + ); + + buffer.append(key(8), "window"); // start a new session + let window = buffer.curr_window(); + assert!(window.following_saturated); + assert_eq!( + buffer.curr_window_values().cloned().collect_vec(), + vec!["hello", "session"] + ); + + buffer.append(key(15), "and"); + buffer.append(key(16), "world"); + assert_eq!( + buffer.curr_window_values().cloned().collect_vec(), + vec!["hello", "session"] + ); + + let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec(); + assert!(removed_keys.is_empty()); + let window = buffer.curr_window(); + assert_eq!(window.key, Some(&key(3))); + assert!(window.preceding_saturated); + assert!(window.following_saturated); + assert_eq!( + buffer.curr_window_values().cloned().collect_vec(), + vec!["hello", "session"] + ); + + let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec(); + assert_eq!(removed_keys, vec![key(1), key(3)]); + assert_eq!(buffer.smallest_key(), Some(&key(8))); + let window = buffer.curr_window(); + assert_eq!(window.key, Some(&key(8))); + assert!(window.preceding_saturated); + assert!(window.following_saturated); + assert_eq!( + buffer.curr_window_values().cloned().collect_vec(), + vec!["window"] + ); + + let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec(); + assert_eq!(removed_keys, vec![key(8)]); + assert_eq!(buffer.smallest_key(), Some(&key(15))); + let window = buffer.curr_window(); + assert_eq!(window.key, Some(&key(15))); + assert!(window.preceding_saturated); + assert!(!window.following_saturated); + assert_eq!( + buffer.curr_window_values().cloned().collect_vec(), + vec!["and", "world"] + ); + + let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec(); + assert!(removed_keys.is_empty()); + assert_eq!(buffer.curr_key(), Some(&key(16))); + assert_eq!( + buffer.curr_window_values().cloned().collect_vec(), + vec!["and", "world"] + ); + + let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec(); + assert_eq!(removed_keys, vec![key(15), key(16)]); + assert!(buffer.curr_key().is_none()); + assert!(buffer + .curr_window_values() + .cloned() + .collect_vec() + .is_empty()); + } }