diff --git a/src/expr/core/src/window_function/state/aggregate.rs b/src/expr/core/src/window_function/state/aggregate.rs index d53b99782e5bf..7d3f4fe20fe57 100644 --- a/src/expr/core/src/window_function/state/aggregate.rs +++ b/src/expr/core/src/window_function/state/aggregate.rs @@ -84,7 +84,7 @@ pub(super) fn new(call: &WindowFuncCall) -> Result { agg_impl, arg_data_types, buffer: WindowBuffer::>::new( - frame_bounds.clone(), + RowsWindow::new(frame_bounds.clone()), call.frame.exclusion, enable_delta, ), diff --git a/src/expr/core/src/window_function/state/buffer.rs b/src/expr/core/src/window_function/state/buffer.rs index 99cb3b58f8317..a2fce713887bb 100644 --- a/src/expr/core/src/window_function/state/buffer.rs +++ b/src/expr/core/src/window_function/state/buffer.rs @@ -20,7 +20,7 @@ use risingwave_common::array::Op; use super::range_utils::range_except; use crate::window_function::state::range_utils::range_diff; -use crate::window_function::{FrameBoundsImpl, FrameExclusion, RowsFrameBounds}; +use crate::window_function::{FrameExclusion, RowsFrameBounds}; /// A common sliding window buffer. pub(super) struct WindowBuffer { @@ -51,20 +51,14 @@ pub(super) struct CurrWindow<'a, K> { } impl WindowBuffer { - pub fn new( - frame_bounds: W::FrameBounds, - frame_exclusion: FrameExclusion, - enable_delta: bool, - ) -> Self { - assert!(frame_bounds.validate().is_ok()); - + pub fn new(window_impl: W, frame_exclusion: FrameExclusion, enable_delta: bool) -> Self { if enable_delta { // TODO(rc): currently only support `FrameExclusion::NoOthers` for delta assert!(frame_exclusion.is_no_others()); } Self { - window_impl: W::new(frame_bounds), + window_impl, frame_exclusion, buffer: Default::default(), curr_idx: 0, @@ -219,11 +213,9 @@ pub(super) struct BufferRefMut<'a, K: Ord, V: Clone> { } pub(super) trait WindowImpl { - type FrameBounds: FrameBoundsImpl; type Key: Ord; type Value: Clone; - fn new(frame_bounds: Self::FrameBounds) -> Self; fn preceding_saturated(&self, buffer_ref: BufferRef<'_, Self::Key, Self::Value>) -> bool; fn following_saturated(&self, buffer_ref: BufferRef<'_, Self::Key, Self::Value>) -> bool; fn recalculate_left_right(&self, buffer_ref: BufferRefMut<'_, Self::Key, Self::Value>); @@ -235,18 +227,19 @@ pub(super) struct RowsWindow { _phantom2: std::marker::PhantomData, } -impl WindowImpl for RowsWindow { - type FrameBounds = RowsFrameBounds; - type Key = K; - type Value = V; - - fn new(frame_bounds: Self::FrameBounds) -> Self { +impl RowsWindow { + pub fn new(frame_bounds: RowsFrameBounds) -> Self { Self { frame_bounds, _phantom: std::marker::PhantomData, _phantom2: std::marker::PhantomData, } } +} + +impl WindowImpl for RowsWindow { + type Key = K; + type Value = V; fn preceding_saturated(&self, buffer_ref: BufferRef<'_, Self::Key, Self::Value>) -> bool { buffer_ref.curr_idx < buffer_ref.buffer.len() && { @@ -338,10 +331,10 @@ mod tests { #[test] fn test_rows_frame_unbounded_preceding_to_current_row() { let mut buffer = WindowBuffer::>::new( - RowsFrameBounds { + RowsWindow::new(RowsFrameBounds { start: UnboundedPreceding, end: CurrentRow, - }, + }), FrameExclusion::NoOthers, true, ); @@ -376,10 +369,10 @@ mod tests { #[test] fn test_rows_frame_preceding_to_current_row() { let mut buffer = WindowBuffer::>::new( - RowsFrameBounds { + RowsWindow::new(RowsFrameBounds { start: Preceding(1), end: CurrentRow, - }, + }), FrameExclusion::NoOthers, true, ); @@ -419,10 +412,10 @@ mod tests { #[test] fn test_rows_frame_preceding_to_preceding() { let mut buffer = WindowBuffer::>::new( - RowsFrameBounds { + RowsWindow::new(RowsFrameBounds { start: Preceding(2), end: Preceding(1), - }, + }), FrameExclusion::NoOthers, true, ); @@ -465,10 +458,10 @@ mod tests { #[test] fn test_rows_frame_current_row_to_unbounded_following() { let mut buffer = WindowBuffer::>::new( - RowsFrameBounds { + RowsWindow::new(RowsFrameBounds { start: CurrentRow, end: UnboundedFollowing, - }, + }), FrameExclusion::NoOthers, true, ); @@ -507,10 +500,10 @@ mod tests { #[test] fn test_rows_frame_current_row_to_following() { let mut buffer = WindowBuffer::>::new( - RowsFrameBounds { + RowsWindow::new(RowsFrameBounds { start: CurrentRow, end: Following(1), - }, + }), FrameExclusion::NoOthers, true, ); @@ -557,10 +550,10 @@ mod tests { #[test] fn test_rows_frame_following_to_following() { let mut buffer = WindowBuffer::>::new( - RowsFrameBounds { + RowsWindow::new(RowsFrameBounds { start: Following(1), end: Following(2), - }, + }), FrameExclusion::NoOthers, true, ); @@ -604,10 +597,10 @@ mod tests { #[test] fn test_rows_frame_exclude_current_row() { let mut buffer = WindowBuffer::>::new( - RowsFrameBounds { + RowsWindow::new(RowsFrameBounds { start: UnboundedPreceding, end: CurrentRow, - }, + }), FrameExclusion::CurrentRow, false, );