Skip to content

Commit

Permalink
construct window impl outside WindowBuffer
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed Jan 22, 2024
1 parent a112c92 commit 8f5ed91
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 32 deletions.
2 changes: 1 addition & 1 deletion src/expr/core/src/window_function/state/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ pub(super) fn new(call: &WindowFuncCall) -> Result<BoxedWindowState> {
agg_impl,
arg_data_types,
buffer: WindowBuffer::<RowsWindow<StateKey, StateValue>>::new(
frame_bounds.clone(),
RowsWindow::new(frame_bounds.clone()),
call.frame.exclusion,
enable_delta,
),
Expand Down
55 changes: 24 additions & 31 deletions src/expr/core/src/window_function/state/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use risingwave_common::array::Op;

use super::range_utils::range_except;
use crate::window_function::state::range_utils::range_diff;
use crate::window_function::{FrameBoundsImpl, FrameExclusion, RowsFrameBounds};
use crate::window_function::{FrameExclusion, RowsFrameBounds};

/// A common sliding window buffer.
pub(super) struct WindowBuffer<W: WindowImpl> {
Expand Down Expand Up @@ -51,20 +51,14 @@ pub(super) struct CurrWindow<'a, K> {
}

impl<W: WindowImpl> WindowBuffer<W> {
pub fn new(
frame_bounds: W::FrameBounds,
frame_exclusion: FrameExclusion,
enable_delta: bool,
) -> Self {
assert!(frame_bounds.validate().is_ok());

pub fn new(window_impl: W, frame_exclusion: FrameExclusion, enable_delta: bool) -> Self {
if enable_delta {
// TODO(rc): currently only support `FrameExclusion::NoOthers` for delta
assert!(frame_exclusion.is_no_others());
}

Self {
window_impl: W::new(frame_bounds),
window_impl,
frame_exclusion,
buffer: Default::default(),
curr_idx: 0,
Expand Down Expand Up @@ -219,11 +213,9 @@ pub(super) struct BufferRefMut<'a, K: Ord, V: Clone> {
}

pub(super) trait WindowImpl {
type FrameBounds: FrameBoundsImpl;
type Key: Ord;
type Value: Clone;

fn new(frame_bounds: Self::FrameBounds) -> Self;
fn preceding_saturated(&self, buffer_ref: BufferRef<'_, Self::Key, Self::Value>) -> bool;
fn following_saturated(&self, buffer_ref: BufferRef<'_, Self::Key, Self::Value>) -> bool;
fn recalculate_left_right(&self, buffer_ref: BufferRefMut<'_, Self::Key, Self::Value>);
Expand All @@ -235,18 +227,19 @@ pub(super) struct RowsWindow<K: Ord, V: Clone> {
_phantom2: std::marker::PhantomData<V>,
}

impl<K: Ord, V: Clone> WindowImpl for RowsWindow<K, V> {
type FrameBounds = RowsFrameBounds;
type Key = K;
type Value = V;

fn new(frame_bounds: Self::FrameBounds) -> Self {
impl<K: Ord, V: Clone> RowsWindow<K, V> {
pub fn new(frame_bounds: RowsFrameBounds) -> Self {
Self {
frame_bounds,
_phantom: std::marker::PhantomData,
_phantom2: std::marker::PhantomData,
}
}
}

impl<K: Ord, V: Clone> WindowImpl for RowsWindow<K, V> {
type Key = K;
type Value = V;

fn preceding_saturated(&self, buffer_ref: BufferRef<'_, Self::Key, Self::Value>) -> bool {
buffer_ref.curr_idx < buffer_ref.buffer.len() && {
Expand Down Expand Up @@ -338,10 +331,10 @@ mod tests {
#[test]
fn test_rows_frame_unbounded_preceding_to_current_row() {
let mut buffer = WindowBuffer::<RowsWindow<_, _>>::new(
RowsFrameBounds {
RowsWindow::new(RowsFrameBounds {
start: UnboundedPreceding,
end: CurrentRow,
},
}),
FrameExclusion::NoOthers,
true,
);
Expand Down Expand Up @@ -376,10 +369,10 @@ mod tests {
#[test]
fn test_rows_frame_preceding_to_current_row() {
let mut buffer = WindowBuffer::<RowsWindow<_, _>>::new(
RowsFrameBounds {
RowsWindow::new(RowsFrameBounds {
start: Preceding(1),
end: CurrentRow,
},
}),
FrameExclusion::NoOthers,
true,
);
Expand Down Expand Up @@ -419,10 +412,10 @@ mod tests {
#[test]
fn test_rows_frame_preceding_to_preceding() {
let mut buffer = WindowBuffer::<RowsWindow<_, _>>::new(
RowsFrameBounds {
RowsWindow::new(RowsFrameBounds {
start: Preceding(2),
end: Preceding(1),
},
}),
FrameExclusion::NoOthers,
true,
);
Expand Down Expand Up @@ -465,10 +458,10 @@ mod tests {
#[test]
fn test_rows_frame_current_row_to_unbounded_following() {
let mut buffer = WindowBuffer::<RowsWindow<_, _>>::new(
RowsFrameBounds {
RowsWindow::new(RowsFrameBounds {
start: CurrentRow,
end: UnboundedFollowing,
},
}),
FrameExclusion::NoOthers,
true,
);
Expand Down Expand Up @@ -507,10 +500,10 @@ mod tests {
#[test]
fn test_rows_frame_current_row_to_following() {
let mut buffer = WindowBuffer::<RowsWindow<_, _>>::new(
RowsFrameBounds {
RowsWindow::new(RowsFrameBounds {
start: CurrentRow,
end: Following(1),
},
}),
FrameExclusion::NoOthers,
true,
);
Expand Down Expand Up @@ -557,10 +550,10 @@ mod tests {
#[test]
fn test_rows_frame_following_to_following() {
let mut buffer = WindowBuffer::<RowsWindow<_, _>>::new(
RowsFrameBounds {
RowsWindow::new(RowsFrameBounds {
start: Following(1),
end: Following(2),
},
}),
FrameExclusion::NoOthers,
true,
);
Expand Down Expand Up @@ -604,10 +597,10 @@ mod tests {
#[test]
fn test_rows_frame_exclude_current_row() {
let mut buffer = WindowBuffer::<RowsWindow<_, _>>::new(
RowsFrameBounds {
RowsWindow::new(RowsFrameBounds {
start: UnboundedPreceding,
end: CurrentRow,
},
}),
FrameExclusion::CurrentRow,
false,
);
Expand Down

0 comments on commit 8f5ed91

Please sign in to comment.