diff --git a/Cargo.lock b/Cargo.lock index 85c4b8d425a5c..bff2c4de23d31 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9171,6 +9171,7 @@ dependencies = [ "chrono", "downcast-rs", "easy-ext", + "educe 0.5.7", "either", "enum-as-inner", "expect-test", diff --git a/src/expr/core/Cargo.toml b/src/expr/core/Cargo.toml index 51972f282826c..a1dea253af9c0 100644 --- a/src/expr/core/Cargo.toml +++ b/src/expr/core/Cargo.toml @@ -31,6 +31,7 @@ chrono = { version = "0.4", default-features = false, features = [ ] } downcast-rs = "1.2" easy-ext = "1" +educe = "0.5" either = "1" enum-as-inner = "0.6" futures-async-stream = { workspace = true } diff --git a/src/expr/core/src/window_function/call.rs b/src/expr/core/src/window_function/call.rs index 9c56bdbf66cf5..6c5839c07bc46 100644 --- a/src/expr/core/src/window_function/call.rs +++ b/src/expr/core/src/window_function/call.rs @@ -143,6 +143,10 @@ impl FrameBounds { } } +pub trait FrameBoundsImpl { + fn validate(&self) -> Result<()>; +} + #[derive(Display, Debug, Clone, Eq, PartialEq, Hash)] #[display("ROWS BETWEEN {start} AND {end}")] pub struct RowsFrameBounds { @@ -151,10 +155,6 @@ pub struct RowsFrameBounds { } impl RowsFrameBounds { - fn validate(&self) -> Result<()> { - FrameBound::validate_bounds(&self.start, &self.end) - } - /// Check if the `ROWS` frame is canonical. /// /// A canonical `ROWS` frame is defined as: @@ -190,6 +190,12 @@ impl RowsFrameBounds { } } +impl FrameBoundsImpl for RowsFrameBounds { + fn validate(&self) -> Result<()> { + FrameBound::validate_bounds(&self.start, &self.end) + } +} + #[derive(Display, Debug, Clone, Eq, PartialEq, Hash, EnumAsInner)] #[display(style = "TITLE CASE")] pub enum FrameBound { diff --git a/src/expr/core/src/window_function/state/aggregate.rs b/src/expr/core/src/window_function/state/aggregate.rs index 09555eecf9201..7d3f4fe20fe57 100644 --- a/src/expr/core/src/window_function/state/aggregate.rs +++ b/src/expr/core/src/window_function/state/aggregate.rs @@ -22,66 +22,82 @@ use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::{bail, must_match}; use smallvec::SmallVec; -use super::buffer::WindowBuffer; -use super::{StateEvictHint, StateKey, StatePos, WindowState}; +use super::buffer::{RowsWindow, WindowBuffer, WindowImpl}; +use super::{BoxedWindowState, StateEvictHint, StateKey, StatePos, WindowState}; use crate::aggregate::{ AggArgs, AggCall, AggregateFunction, AggregateState as AggImplState, BoxedAggregateFunction, }; use crate::sig::FUNCTION_REGISTRY; -use crate::window_function::{WindowFuncCall, WindowFuncKind}; +use crate::window_function::{FrameBounds, WindowFuncCall, WindowFuncKind}; use crate::Result; -pub struct AggregateState { +type StateValue = SmallVec<[Datum; 2]>; + +struct AggregateState +where + W: WindowImpl, +{ agg_func: BoxedAggregateFunction, agg_impl: AggImpl, arg_data_types: Vec, - buffer: WindowBuffer>, + buffer: WindowBuffer, buffer_heap_size: KvSize, } -impl AggregateState { - pub fn new(call: &WindowFuncCall) -> Result { - if call.frame.bounds.validate().is_err() { - bail!("the window frame must be valid"); - } - let agg_kind = must_match!(call.kind, WindowFuncKind::Aggregate(agg_kind) => agg_kind); - let arg_data_types = call.args.arg_types().to_vec(); - let agg_call = AggCall { - kind: agg_kind, - args: match &call.args { - // convert args to [0] or [0, 1] - AggArgs::None => AggArgs::None, - AggArgs::Unary(data_type, _) => AggArgs::Unary(data_type.to_owned(), 0), - AggArgs::Binary(data_types, _) => AggArgs::Binary(data_types.to_owned(), [0, 1]), - }, - return_type: call.return_type.clone(), - column_orders: Vec::new(), // the input is already sorted - // TODO(rc): support filter on window function call - filter: None, - // TODO(rc): support distinct on window function call? PG doesn't support it either. - distinct: false, - direct_args: vec![], +pub(super) fn new(call: &WindowFuncCall) -> Result { + if call.frame.bounds.validate().is_err() { + bail!("the window frame must be valid"); + } + let agg_kind = must_match!(call.kind, WindowFuncKind::Aggregate(agg_kind) => agg_kind); + let arg_data_types = call.args.arg_types().to_vec(); + let agg_call = AggCall { + kind: agg_kind, + args: match &call.args { + // convert args to [0] or [0, 1] + AggArgs::None => AggArgs::None, + AggArgs::Unary(data_type, _) => AggArgs::Unary(data_type.to_owned(), 0), + AggArgs::Binary(data_types, _) => AggArgs::Binary(data_types.to_owned(), [0, 1]), + }, + return_type: call.return_type.clone(), + column_orders: Vec::new(), // the input is already sorted + // TODO(rc): support filter on window function call + filter: None, + // TODO(rc): support distinct on window function call? PG doesn't support it either. + distinct: false, + direct_args: vec![], + }; + let agg_func_sig = FUNCTION_REGISTRY + .get(agg_kind, &arg_data_types, &call.return_type) + .expect("the agg func must exist"); + let agg_func = agg_func_sig.build_aggregate(&agg_call)?; + let (agg_impl, enable_delta) = + if agg_func_sig.is_retractable() && call.frame.exclusion.is_no_others() { + let init_state = agg_func.create_state(); + (AggImpl::Incremental(init_state), true) + } else { + (AggImpl::Full, false) }; - let agg_func_sig = FUNCTION_REGISTRY - .get(agg_kind, &arg_data_types, &call.return_type) - .expect("the agg func must exist"); - let agg_func = agg_func_sig.build_aggregate(&agg_call)?; - let (agg_impl, enable_delta) = - if agg_func_sig.is_retractable() && call.frame.exclusion.is_no_others() { - let init_state = agg_func.create_state(); - (AggImpl::Incremental(init_state), true) - } else { - (AggImpl::Full, false) - }; - Ok(Self { + + let this = match &call.frame.bounds { + FrameBounds::Rows(frame_bounds) => Box::new(AggregateState { agg_func, agg_impl, arg_data_types, - buffer: WindowBuffer::new(call.frame.clone(), enable_delta), + buffer: WindowBuffer::>::new( + RowsWindow::new(frame_bounds.clone()), + call.frame.exclusion, + enable_delta, + ), buffer_heap_size: KvSize::new(), - }) - } + }) as BoxedWindowState, + }; + Ok(this) +} +impl AggregateState +where + W: WindowImpl, +{ fn slide_inner(&mut self) -> StateEvictHint { let removed_keys: BTreeSet<_> = self .buffer @@ -107,7 +123,10 @@ impl AggregateState { } } -impl WindowState for AggregateState { +impl WindowState for AggregateState +where + W: WindowImpl, +{ fn append(&mut self, key: StateKey, args: SmallVec<[Datum; 2]>) { args.iter().for_each(|arg| { self.buffer_heap_size.add_val(arg); @@ -156,7 +175,10 @@ impl WindowState for AggregateState { } } -impl EstimateSize for AggregateState { +impl EstimateSize for AggregateState +where + W: WindowImpl, +{ fn estimated_heap_size(&self) -> usize { // estimate `VecDeque` of `StreamWindowBuffer` internal size // https://github.com/risingwavelabs/risingwave/issues/9713 diff --git a/src/expr/core/src/window_function/state/buffer.rs b/src/expr/core/src/window_function/state/buffer.rs index 3edb6d7adc164..34b727fbd475e 100644 --- a/src/expr/core/src/window_function/state/buffer.rs +++ b/src/expr/core/src/window_function/state/buffer.rs @@ -15,43 +15,52 @@ use std::collections::VecDeque; use std::ops::Range; +use educe::Educe; use risingwave_common::array::Op; use super::range_utils::range_except; use crate::window_function::state::range_utils::range_diff; -use crate::window_function::{Frame, FrameBounds, FrameExclusion}; - -struct Entry { - key: K, - value: V, -} +use crate::window_function::{FrameExclusion, RowsFrameBounds}; /// A common sliding window buffer. -pub struct WindowBuffer { - frame: Frame, - buffer: VecDeque>, +pub(super) struct WindowBuffer { + window_impl: W, + frame_exclusion: FrameExclusion, + buffer: VecDeque>, curr_idx: usize, left_idx: usize, // inclusive, note this can be > `curr_idx` right_excl_idx: usize, // exclusive, note this can be <= `curr_idx` - curr_delta: Option>, + curr_delta: Option>, +} + +/// A key-value pair in the buffer. +struct Entry { + key: K, + value: V, } /// Note: A window frame can be pure preceding, pure following, or acrossing the _current row_. -pub struct CurrWindow<'a, K> { +pub(super) struct CurrWindow<'a, K> { pub key: Option<&'a K>, + + // XXX(rc): Maybe will be used in the future, let's keep it for now. + #[cfg_attr(not(test), expect(dead_code))] + /// The preceding half of the current window is saturated. pub preceding_saturated: bool, + /// The following half of the current window is saturated. pub following_saturated: bool, } -impl WindowBuffer { - pub fn new(frame: Frame, enable_delta: bool) -> Self { - assert!(frame.bounds.validate().is_ok()); +impl WindowBuffer { + pub fn new(window_impl: W, frame_exclusion: FrameExclusion, enable_delta: bool) -> Self { if enable_delta { // TODO(rc): currently only support `FrameExclusion::NoOthers` for delta - assert!(frame.exclusion.is_no_others()); + assert!(frame_exclusion.is_no_others()); } + Self { - frame, + window_impl, + frame_exclusion, buffer: Default::default(), curr_idx: 0, left_idx: 0, @@ -64,66 +73,29 @@ impl WindowBuffer { } } - fn preceding_saturated(&self) -> bool { - self.curr_key().is_some() - && match &self.frame.bounds { - FrameBounds::Rows(bounds) => { - let start_off = bounds.start.to_offset(); - if let Some(start_off) = start_off { - if start_off >= 0 { - true // pure following frame, always preceding-saturated - } else { - // FIXME(rc): Clippy rule `clippy::nonminimal_bool` is misreporting that - // the following can be simplified. - #[allow(clippy::nonminimal_bool)] - { - assert!(self.curr_idx >= self.left_idx); - } - self.curr_idx - self.left_idx >= start_off.unsigned_abs() - } - } else { - false // unbounded frame start, never preceding-saturated - } - } - } - } - - fn following_saturated(&self) -> bool { - self.curr_key().is_some() - && match &self.frame.bounds { - FrameBounds::Rows(bounds) => { - let end_off = bounds.end.to_offset(); - if let Some(end_off) = end_off { - if end_off <= 0 { - true // pure preceding frame, always following-saturated - } else { - // FIXME(rc): Ditto. - #[allow(clippy::nonminimal_bool)] - { - assert!(self.right_excl_idx > 0); - assert!(self.right_excl_idx > self.curr_idx); - assert!(self.right_excl_idx <= self.buffer.len()); - } - self.right_excl_idx - 1 - self.curr_idx >= end_off as usize - } - } else { - false // unbounded frame end, never following-saturated - } - } - } + /// Get the smallest key that is still kept in the buffer. + /// Returns `None` if there's nothing yet. + pub fn smallest_key(&self) -> Option<&W::Key> { + self.buffer.front().map(|Entry { key, .. }| key) } /// Get the key part of the current row. - pub fn curr_key(&self) -> Option<&K> { + pub fn curr_key(&self) -> Option<&W::Key> { self.buffer.get(self.curr_idx).map(|Entry { key, .. }| key) } /// Get the current window info. - pub fn curr_window(&self) -> CurrWindow<'_, K> { + pub fn curr_window(&self) -> CurrWindow<'_, W::Key> { + let buffer_ref = BufferRef { + buffer: &self.buffer, + curr_idx: self.curr_idx, + left_idx: self.left_idx, + right_excl_idx: self.right_excl_idx, + }; CurrWindow { key: self.curr_key(), - preceding_saturated: self.preceding_saturated(), - following_saturated: self.following_saturated(), + preceding_saturated: self.window_impl.preceding_saturated(buffer_ref), + following_saturated: self.window_impl.following_saturated(buffer_ref), } } @@ -133,7 +105,7 @@ impl WindowBuffer { fn curr_window_exclusion(&self) -> Range { // TODO(rc): should intersect with `curr_window_outer` to be more accurate - match self.frame.exclusion { + match self.frame_exclusion { FrameExclusion::CurrentRow => self.curr_idx..self.curr_idx + 1, FrameExclusion::NoOthers => self.curr_idx..self.curr_idx, } @@ -146,7 +118,7 @@ impl WindowBuffer { } /// Iterate over values in the current window. - pub fn curr_window_values(&self) -> impl Iterator { + pub fn curr_window_values(&self) -> impl Iterator { assert!(self.left_idx <= self.right_excl_idx); assert!(self.right_excl_idx <= self.buffer.len()); @@ -159,69 +131,17 @@ impl WindowBuffer { /// Consume the delta of values comparing the current window to the previous window. /// The delta is not guaranteed to be sorted, especially when frame exclusion is not `NoOthers`. - pub fn consume_curr_window_values_delta(&mut self) -> impl Iterator + '_ { + pub fn consume_curr_window_values_delta( + &mut self, + ) -> impl Iterator + '_ { self.curr_delta .as_mut() .expect("delta mode should be enabled") .drain(..) } - fn recalculate_left_right(&mut self) { - // TODO(rc): For the sake of simplicity, we just recalculate the left and right indices from - // `curr_idx`, rather than trying to update them incrementally. The complexity is O(n) for - // `Frame::Range` where n is the length of the buffer, for now it doesn't matter. - - if self.buffer.is_empty() { - self.left_idx = 0; - self.right_excl_idx = 0; - } - - match &self.frame.bounds { - FrameBounds::Rows(bounds) => { - let start_off = bounds.start.to_offset(); - let end_off = bounds.end.to_offset(); - if let Some(start_off) = start_off { - let logical_left_idx = self.curr_idx as isize + start_off; - if logical_left_idx >= 0 { - self.left_idx = std::cmp::min(logical_left_idx as usize, self.buffer.len()); - } else { - self.left_idx = 0; - } - } else { - // unbounded start - self.left_idx = 0; - } - if let Some(end_off) = end_off { - let logical_right_excl_idx = self.curr_idx as isize + end_off + 1; - if logical_right_excl_idx >= 0 { - self.right_excl_idx = - std::cmp::min(logical_right_excl_idx as usize, self.buffer.len()); - } else { - self.right_excl_idx = 0; - } - } else { - // unbounded end - self.right_excl_idx = self.buffer.len(); - } - } - } - } - - fn maintain_delta(&mut self, old_outer: Range, new_outer: Range) { - debug_assert!(self.frame.exclusion.is_no_others()); - - let (outer_removed, outer_added) = range_diff(old_outer.clone(), new_outer.clone()); - let delta = self.curr_delta.as_mut().unwrap(); - for idx in outer_removed.iter().cloned().flatten() { - delta.push((Op::Delete, self.buffer[idx].value.clone())); - } - for idx in outer_added.iter().cloned().flatten() { - delta.push((Op::Insert, self.buffer[idx].value.clone())); - } - } - /// Append a key-value pair to the buffer. - pub fn append(&mut self, key: K, value: V) { + pub fn append(&mut self, key: W::Key, value: W::Value) { let old_outer = self.curr_window_outer(); self.buffer.push_back(Entry { key, value }); @@ -232,15 +152,9 @@ impl WindowBuffer { } } - /// Get the smallest key that is still kept in the buffer. - /// Returns `None` if there's nothing yet. - pub fn smallest_key(&self) -> Option<&K> { - self.buffer.front().map(|Entry { key, .. }| key) - } - /// Slide the current window forward. /// Returns the keys that are removed from the buffer. - pub fn slide(&mut self) -> impl Iterator + '_ { + pub fn slide(&mut self) -> impl Iterator + '_ { let old_outer = self.curr_window_outer(); self.curr_idx += 1; @@ -258,6 +172,165 @@ impl WindowBuffer { .drain(0..min_needed_idx) .map(|Entry { key, value }| (key, value)) } + + fn maintain_delta(&mut self, old_outer: Range, new_outer: Range) { + debug_assert!(self.frame_exclusion.is_no_others()); + + let (outer_removed, outer_added) = range_diff(old_outer.clone(), new_outer.clone()); + let delta = self.curr_delta.as_mut().unwrap(); + for idx in outer_removed.iter().cloned().flatten() { + delta.push((Op::Delete, self.buffer[idx].value.clone())); + } + for idx in outer_added.iter().cloned().flatten() { + delta.push((Op::Insert, self.buffer[idx].value.clone())); + } + } + + fn recalculate_left_right(&mut self) { + let buffer_ref = BufferRefMut { + buffer: &self.buffer, + curr_idx: &mut self.curr_idx, + left_idx: &mut self.left_idx, + right_excl_idx: &mut self.right_excl_idx, + }; + self.window_impl.recalculate_left_right(buffer_ref); + } +} + +/// Wraps a reference to the buffer and some indices, to be used by [`WindowImpl`]s. +#[derive(Educe)] +#[educe(Clone, Copy)] +pub(super) struct BufferRef<'a, K: Ord, V: Clone> { + buffer: &'a VecDeque>, + curr_idx: usize, + left_idx: usize, + right_excl_idx: usize, +} + +/// Wraps a reference to the buffer and some mutable indices, to be used by [`WindowImpl`]s. +pub(super) struct BufferRefMut<'a, K: Ord, V: Clone> { + buffer: &'a VecDeque>, + curr_idx: &'a mut usize, + left_idx: &'a mut usize, + right_excl_idx: &'a mut usize, +} + +/// A trait for sliding window implementations. This trait is used by [`WindowBuffer`] to +/// determine the status of current window and how to slide the window. +pub(super) trait WindowImpl { + type Key: Ord; + type Value: Clone; + + /// Whether the preceding half of the current window is saturated. + /// By "saturated" we mean that every row that is possible to be in the preceding half of the + /// current window is already in the buffer. + fn preceding_saturated(&self, buffer_ref: BufferRef<'_, Self::Key, Self::Value>) -> bool; + + /// Whether the following half of the current window is saturated. + fn following_saturated(&self, buffer_ref: BufferRef<'_, Self::Key, Self::Value>) -> bool; + + /// Recalculate the left and right indices of the current window, according to the latest + /// `curr_idx`. The indices are indices in the buffer vector. + fn recalculate_left_right(&self, buffer_ref: BufferRefMut<'_, Self::Key, Self::Value>); +} + +/// The sliding window implementation for `ROWS` frames. +pub(super) struct RowsWindow { + frame_bounds: RowsFrameBounds, + _phantom: std::marker::PhantomData, + _phantom2: std::marker::PhantomData, +} + +impl RowsWindow { + pub fn new(frame_bounds: RowsFrameBounds) -> Self { + Self { + frame_bounds, + _phantom: std::marker::PhantomData, + _phantom2: std::marker::PhantomData, + } + } +} + +impl WindowImpl for RowsWindow { + type Key = K; + type Value = V; + + fn preceding_saturated(&self, buffer_ref: BufferRef<'_, Self::Key, Self::Value>) -> bool { + buffer_ref.curr_idx < buffer_ref.buffer.len() && { + let start_off = self.frame_bounds.start.to_offset(); + if let Some(start_off) = start_off { + if start_off >= 0 { + true // pure following frame, always preceding-saturated + } else { + // FIXME(rc): Clippy rule `clippy::nonminimal_bool` is misreporting that + // the following can be simplified. + #[allow(clippy::nonminimal_bool)] + { + assert!(buffer_ref.curr_idx >= buffer_ref.left_idx); + } + buffer_ref.curr_idx - buffer_ref.left_idx >= start_off.unsigned_abs() + } + } else { + false // unbounded frame start, never preceding-saturated + } + } + } + + fn following_saturated(&self, buffer_ref: BufferRef<'_, Self::Key, Self::Value>) -> bool { + buffer_ref.curr_idx < buffer_ref.buffer.len() && { + let end_off = self.frame_bounds.end.to_offset(); + if let Some(end_off) = end_off { + if end_off <= 0 { + true // pure preceding frame, always following-saturated + } else { + // FIXME(rc): Ditto. + #[allow(clippy::nonminimal_bool)] + { + assert!(buffer_ref.right_excl_idx > 0); + assert!(buffer_ref.right_excl_idx > buffer_ref.curr_idx); + assert!(buffer_ref.right_excl_idx <= buffer_ref.buffer.len()); + } + buffer_ref.right_excl_idx - 1 - buffer_ref.curr_idx >= end_off as usize + } + } else { + false // unbounded frame end, never following-saturated + } + } + } + + fn recalculate_left_right(&self, buffer_ref: BufferRefMut<'_, Self::Key, Self::Value>) { + if buffer_ref.buffer.is_empty() { + *buffer_ref.left_idx = 0; + *buffer_ref.right_excl_idx = 0; + } + + let start_off = self.frame_bounds.start.to_offset(); + let end_off = self.frame_bounds.end.to_offset(); + if let Some(start_off) = start_off { + let logical_left_idx = *buffer_ref.curr_idx as isize + start_off; + if logical_left_idx >= 0 { + *buffer_ref.left_idx = + std::cmp::min(logical_left_idx as usize, buffer_ref.buffer.len()); + } else { + *buffer_ref.left_idx = 0; + } + } else { + // unbounded start + *buffer_ref.left_idx = 0; + } + if let Some(end_off) = end_off { + let logical_right_excl_idx = *buffer_ref.curr_idx as isize + end_off + 1; + if logical_right_excl_idx >= 0 { + *buffer_ref.right_excl_idx = + std::cmp::min(logical_right_excl_idx as usize, buffer_ref.buffer.len()); + } else { + *buffer_ref.right_excl_idx = 0; + } + } else { + // unbounded end + *buffer_ref.right_excl_idx = buffer_ref.buffer.len(); + } + } } #[cfg(test)] @@ -265,12 +338,18 @@ mod tests { use itertools::Itertools; use super::*; - use crate::window_function::{Frame, FrameBound}; + use crate::window_function::FrameBound::{ + CurrentRow, Following, Preceding, UnboundedFollowing, UnboundedPreceding, + }; #[test] fn test_rows_frame_unbounded_preceding_to_current_row() { - let mut buffer = WindowBuffer::new( - Frame::rows(FrameBound::UnboundedPreceding, FrameBound::CurrentRow), + let mut buffer = WindowBuffer::>::new( + RowsWindow::new(RowsFrameBounds { + start: UnboundedPreceding, + end: CurrentRow, + }), + FrameExclusion::NoOthers, true, ); @@ -303,8 +382,12 @@ mod tests { #[test] fn test_rows_frame_preceding_to_current_row() { - let mut buffer = WindowBuffer::new( - Frame::rows(FrameBound::Preceding(1), FrameBound::CurrentRow), + let mut buffer = WindowBuffer::>::new( + RowsWindow::new(RowsFrameBounds { + start: Preceding(1), + end: CurrentRow, + }), + FrameExclusion::NoOthers, true, ); @@ -342,8 +425,12 @@ mod tests { #[test] fn test_rows_frame_preceding_to_preceding() { - let mut buffer = WindowBuffer::new( - Frame::rows(FrameBound::Preceding(2), FrameBound::Preceding(1)), + let mut buffer = WindowBuffer::>::new( + RowsWindow::new(RowsFrameBounds { + start: Preceding(2), + end: Preceding(1), + }), + FrameExclusion::NoOthers, true, ); @@ -384,8 +471,12 @@ mod tests { #[test] fn test_rows_frame_current_row_to_unbounded_following() { - let mut buffer = WindowBuffer::new( - Frame::rows(FrameBound::CurrentRow, FrameBound::UnboundedFollowing), + let mut buffer = WindowBuffer::>::new( + RowsWindow::new(RowsFrameBounds { + start: CurrentRow, + end: UnboundedFollowing, + }), + FrameExclusion::NoOthers, true, ); @@ -422,8 +513,12 @@ mod tests { #[test] fn test_rows_frame_current_row_to_following() { - let mut buffer = WindowBuffer::new( - Frame::rows(FrameBound::CurrentRow, FrameBound::Following(1)), + let mut buffer = WindowBuffer::>::new( + RowsWindow::new(RowsFrameBounds { + start: CurrentRow, + end: Following(1), + }), + FrameExclusion::NoOthers, true, ); @@ -468,8 +563,12 @@ mod tests { #[test] fn test_rows_frame_following_to_following() { - let mut buffer = WindowBuffer::new( - Frame::rows(FrameBound::Following(1), FrameBound::Following(2)), + let mut buffer = WindowBuffer::>::new( + RowsWindow::new(RowsFrameBounds { + start: Following(1), + end: Following(2), + }), + FrameExclusion::NoOthers, true, ); @@ -511,12 +610,12 @@ mod tests { #[test] fn test_rows_frame_exclude_current_row() { - let mut buffer = WindowBuffer::new( - Frame::rows_with_exclusion( - FrameBound::UnboundedPreceding, - FrameBound::CurrentRow, - FrameExclusion::CurrentRow, - ), + let mut buffer = WindowBuffer::>::new( + RowsWindow::new(RowsFrameBounds { + start: UnboundedPreceding, + end: CurrentRow, + }), + FrameExclusion::CurrentRow, false, ); diff --git a/src/expr/core/src/window_function/state/mod.rs b/src/expr/core/src/window_function/state/mod.rs index 37ee086ca7ba4..fbaec55a84c38 100644 --- a/src/expr/core/src/window_function/state/mod.rs +++ b/src/expr/core/src/window_function/state/mod.rs @@ -114,7 +114,9 @@ pub trait WindowState: EstimateSize { fn slide_no_output(&mut self) -> Result; } -pub fn create_window_state(call: &WindowFuncCall) -> Result> { +pub type BoxedWindowState = Box; + +pub fn create_window_state(call: &WindowFuncCall) -> Result { assert!(call.frame.bounds.validate().is_ok()); use WindowFuncKind::*; @@ -122,7 +124,7 @@ pub fn create_window_state(call: &WindowFuncCall) -> Result Box::new(rank::RankState::::new(call)), Rank => Box::new(rank::RankState::::new(call)), DenseRank => Box::new(rank::RankState::::new(call)), - Aggregate(_) => Box::new(aggregate::AggregateState::new(call)?), + Aggregate(_) => aggregate::new(call)?, kind => { return Err(ExprError::UnsupportedFunction(format!( "{}({}) -> {}",