From 58406812b773adfd8ee23ad191ae70f136ce95d1 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Thu, 18 Jan 2024 13:41:18 +0800 Subject: [PATCH] generalize `WindowBuffer` Signed-off-by: Richard Chien --- Cargo.lock | 1 + src/expr/core/Cargo.toml | 1 + src/expr/core/src/window_function/call.rs | 6 +- .../core/src/window_function/state/buffer.rs | 389 +++++++++++------- 4 files changed, 245 insertions(+), 152 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 663875ba23db9..082cef29a2adf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9114,6 +9114,7 @@ dependencies = [ "ctor", "downcast-rs", "easy-ext", + "educe 0.5.7", "either", "enum-as-inner", "expect-test", diff --git a/src/expr/core/Cargo.toml b/src/expr/core/Cargo.toml index ff1910e9a0182..151518c6456c0 100644 --- a/src/expr/core/Cargo.toml +++ b/src/expr/core/Cargo.toml @@ -31,6 +31,7 @@ chrono = { version = "0.4", default-features = false, features = [ ctor = "0.2" downcast-rs = "1.2" easy-ext = "1" +educe = "0.5" either = "1" enum-as-inner = "0.6" futures-async-stream = { workspace = true } diff --git a/src/expr/core/src/window_function/call.rs b/src/expr/core/src/window_function/call.rs index 078a2dd2cd734..b99b1d5bcf1b3 100644 --- a/src/expr/core/src/window_function/call.rs +++ b/src/expr/core/src/window_function/call.rs @@ -143,6 +143,10 @@ impl FrameBounds { } } +pub trait FrameBoundsImpl { + fn validate(&self) -> Result<()>; +} + #[derive(Display, Debug, Clone, Eq, PartialEq, Hash)] #[display("ROWS BETWEEN {start} AND {end}")] pub struct RowsFrameBounds { @@ -150,7 +154,7 @@ pub struct RowsFrameBounds { pub end: FrameBound, } -impl RowsFrameBounds { +impl FrameBoundsImpl for RowsFrameBounds { fn validate(&self) -> Result<()> { FrameBound::validate_bounds(&self.start, &self.end) } diff --git a/src/expr/core/src/window_function/state/buffer.rs b/src/expr/core/src/window_function/state/buffer.rs index 3edb6d7adc164..903c2f431df61 100644 --- a/src/expr/core/src/window_function/state/buffer.rs +++ b/src/expr/core/src/window_function/state/buffer.rs @@ -15,43 +15,52 @@ use std::collections::VecDeque; use std::ops::Range; +use educe::Educe; use risingwave_common::array::Op; use super::range_utils::range_except; use crate::window_function::state::range_utils::range_diff; -use crate::window_function::{Frame, FrameBounds, FrameExclusion}; - -struct Entry { - key: K, - value: V, -} +use crate::window_function::{FrameBoundsImpl, FrameExclusion, RowsFrameBounds}; /// A common sliding window buffer. -pub struct WindowBuffer { - frame: Frame, - buffer: VecDeque>, +pub(super) struct WindowBuffer { + window_impl: W, + frame_exclusion: FrameExclusion, + buffer: VecDeque>, curr_idx: usize, left_idx: usize, // inclusive, note this can be > `curr_idx` right_excl_idx: usize, // exclusive, note this can be <= `curr_idx` - curr_delta: Option>, + curr_delta: Option>, +} + +struct Entry { + key: K, + value: V, } /// Note: A window frame can be pure preceding, pure following, or acrossing the _current row_. -pub struct CurrWindow<'a, K> { +pub(super) struct CurrWindow<'a, K> { pub key: Option<&'a K>, pub preceding_saturated: bool, pub following_saturated: bool, } -impl WindowBuffer { - pub fn new(frame: Frame, enable_delta: bool) -> Self { - assert!(frame.bounds.validate().is_ok()); +impl WindowBuffer { + pub fn new( + frame_bounds: W::FrameBounds, + frame_exclusion: FrameExclusion, + enable_delta: bool, + ) -> Self { + assert!(frame_bounds.validate().is_ok()); + if enable_delta { // TODO(rc): currently only support `FrameExclusion::NoOthers` for delta - assert!(frame.exclusion.is_no_others()); + assert!(frame_exclusion.is_no_others()); } + Self { - frame, + window_impl: W::new(frame_bounds), + frame_exclusion, buffer: Default::default(), curr_idx: 0, left_idx: 0, @@ -64,66 +73,29 @@ impl WindowBuffer { } } - fn preceding_saturated(&self) -> bool { - self.curr_key().is_some() - && match &self.frame.bounds { - FrameBounds::Rows(bounds) => { - let start_off = bounds.start.to_offset(); - if let Some(start_off) = start_off { - if start_off >= 0 { - true // pure following frame, always preceding-saturated - } else { - // FIXME(rc): Clippy rule `clippy::nonminimal_bool` is misreporting that - // the following can be simplified. - #[allow(clippy::nonminimal_bool)] - { - assert!(self.curr_idx >= self.left_idx); - } - self.curr_idx - self.left_idx >= start_off.unsigned_abs() - } - } else { - false // unbounded frame start, never preceding-saturated - } - } - } - } - - fn following_saturated(&self) -> bool { - self.curr_key().is_some() - && match &self.frame.bounds { - FrameBounds::Rows(bounds) => { - let end_off = bounds.end.to_offset(); - if let Some(end_off) = end_off { - if end_off <= 0 { - true // pure preceding frame, always following-saturated - } else { - // FIXME(rc): Ditto. - #[allow(clippy::nonminimal_bool)] - { - assert!(self.right_excl_idx > 0); - assert!(self.right_excl_idx > self.curr_idx); - assert!(self.right_excl_idx <= self.buffer.len()); - } - self.right_excl_idx - 1 - self.curr_idx >= end_off as usize - } - } else { - false // unbounded frame end, never following-saturated - } - } - } + /// Get the smallest key that is still kept in the buffer. + /// Returns `None` if there's nothing yet. + pub fn smallest_key(&self) -> Option<&W::Key> { + self.buffer.front().map(|Entry { key, .. }| key) } /// Get the key part of the current row. - pub fn curr_key(&self) -> Option<&K> { + pub fn curr_key(&self) -> Option<&W::Key> { self.buffer.get(self.curr_idx).map(|Entry { key, .. }| key) } /// Get the current window info. - pub fn curr_window(&self) -> CurrWindow<'_, K> { + pub fn curr_window(&self) -> CurrWindow<'_, W::Key> { + let buffer_ref = BufferRef { + buffer: &self.buffer, + curr_idx: self.curr_idx, + left_idx: self.left_idx, + right_excl_idx: self.right_excl_idx, + }; CurrWindow { key: self.curr_key(), - preceding_saturated: self.preceding_saturated(), - following_saturated: self.following_saturated(), + preceding_saturated: self.window_impl.preceding_saturated(buffer_ref), + following_saturated: self.window_impl.following_saturated(buffer_ref), } } @@ -133,7 +105,7 @@ impl WindowBuffer { fn curr_window_exclusion(&self) -> Range { // TODO(rc): should intersect with `curr_window_outer` to be more accurate - match self.frame.exclusion { + match self.frame_exclusion { FrameExclusion::CurrentRow => self.curr_idx..self.curr_idx + 1, FrameExclusion::NoOthers => self.curr_idx..self.curr_idx, } @@ -146,7 +118,7 @@ impl WindowBuffer { } /// Iterate over values in the current window. - pub fn curr_window_values(&self) -> impl Iterator { + pub fn curr_window_values(&self) -> impl Iterator { assert!(self.left_idx <= self.right_excl_idx); assert!(self.right_excl_idx <= self.buffer.len()); @@ -159,69 +131,17 @@ impl WindowBuffer { /// Consume the delta of values comparing the current window to the previous window. /// The delta is not guaranteed to be sorted, especially when frame exclusion is not `NoOthers`. - pub fn consume_curr_window_values_delta(&mut self) -> impl Iterator + '_ { + pub fn consume_curr_window_values_delta( + &mut self, + ) -> impl Iterator + '_ { self.curr_delta .as_mut() .expect("delta mode should be enabled") .drain(..) } - fn recalculate_left_right(&mut self) { - // TODO(rc): For the sake of simplicity, we just recalculate the left and right indices from - // `curr_idx`, rather than trying to update them incrementally. The complexity is O(n) for - // `Frame::Range` where n is the length of the buffer, for now it doesn't matter. - - if self.buffer.is_empty() { - self.left_idx = 0; - self.right_excl_idx = 0; - } - - match &self.frame.bounds { - FrameBounds::Rows(bounds) => { - let start_off = bounds.start.to_offset(); - let end_off = bounds.end.to_offset(); - if let Some(start_off) = start_off { - let logical_left_idx = self.curr_idx as isize + start_off; - if logical_left_idx >= 0 { - self.left_idx = std::cmp::min(logical_left_idx as usize, self.buffer.len()); - } else { - self.left_idx = 0; - } - } else { - // unbounded start - self.left_idx = 0; - } - if let Some(end_off) = end_off { - let logical_right_excl_idx = self.curr_idx as isize + end_off + 1; - if logical_right_excl_idx >= 0 { - self.right_excl_idx = - std::cmp::min(logical_right_excl_idx as usize, self.buffer.len()); - } else { - self.right_excl_idx = 0; - } - } else { - // unbounded end - self.right_excl_idx = self.buffer.len(); - } - } - } - } - - fn maintain_delta(&mut self, old_outer: Range, new_outer: Range) { - debug_assert!(self.frame.exclusion.is_no_others()); - - let (outer_removed, outer_added) = range_diff(old_outer.clone(), new_outer.clone()); - let delta = self.curr_delta.as_mut().unwrap(); - for idx in outer_removed.iter().cloned().flatten() { - delta.push((Op::Delete, self.buffer[idx].value.clone())); - } - for idx in outer_added.iter().cloned().flatten() { - delta.push((Op::Insert, self.buffer[idx].value.clone())); - } - } - /// Append a key-value pair to the buffer. - pub fn append(&mut self, key: K, value: V) { + pub fn append(&mut self, key: W::Key, value: W::Value) { let old_outer = self.curr_window_outer(); self.buffer.push_back(Entry { key, value }); @@ -232,15 +152,9 @@ impl WindowBuffer { } } - /// Get the smallest key that is still kept in the buffer. - /// Returns `None` if there's nothing yet. - pub fn smallest_key(&self) -> Option<&K> { - self.buffer.front().map(|Entry { key, .. }| key) - } - /// Slide the current window forward. /// Returns the keys that are removed from the buffer. - pub fn slide(&mut self) -> impl Iterator + '_ { + pub fn slide(&mut self) -> impl Iterator + '_ { let old_outer = self.curr_window_outer(); self.curr_idx += 1; @@ -258,6 +172,153 @@ impl WindowBuffer { .drain(0..min_needed_idx) .map(|Entry { key, value }| (key, value)) } + + fn maintain_delta(&mut self, old_outer: Range, new_outer: Range) { + debug_assert!(self.frame_exclusion.is_no_others()); + + let (outer_removed, outer_added) = range_diff(old_outer.clone(), new_outer.clone()); + let delta = self.curr_delta.as_mut().unwrap(); + for idx in outer_removed.iter().cloned().flatten() { + delta.push((Op::Delete, self.buffer[idx].value.clone())); + } + for idx in outer_added.iter().cloned().flatten() { + delta.push((Op::Insert, self.buffer[idx].value.clone())); + } + } + + fn recalculate_left_right(&mut self) { + let buffer_ref = BufferRefMut { + buffer: &self.buffer, + curr_idx: &mut self.curr_idx, + left_idx: &mut self.left_idx, + right_excl_idx: &mut self.right_excl_idx, + }; + self.window_impl.recalculate_left_right(buffer_ref); + } +} + +#[derive(Educe)] +#[educe(Clone, Copy)] +pub(super) struct BufferRef<'a, K: Ord, V: Clone> { + buffer: &'a VecDeque>, + curr_idx: usize, + left_idx: usize, + right_excl_idx: usize, +} + +pub(super) struct BufferRefMut<'a, K: Ord, V: Clone> { + buffer: &'a VecDeque>, + curr_idx: &'a mut usize, + left_idx: &'a mut usize, + right_excl_idx: &'a mut usize, +} + +pub(super) trait WindowImpl { + type FrameBounds: FrameBoundsImpl; + type Key: Ord; + type Value: Clone; + + fn new(frame_bounds: Self::FrameBounds) -> Self; + fn preceding_saturated(&self, buffer_ref: BufferRef<'_, Self::Key, Self::Value>) -> bool; + fn following_saturated(&self, buffer_ref: BufferRef<'_, Self::Key, Self::Value>) -> bool; + fn recalculate_left_right(&self, buffer_ref: BufferRefMut<'_, Self::Key, Self::Value>); +} + +pub(super) struct RowsWindow { + frame_bounds: RowsFrameBounds, + _phantom: std::marker::PhantomData, + _phantom2: std::marker::PhantomData, +} + +impl WindowImpl for RowsWindow { + type FrameBounds = RowsFrameBounds; + type Key = K; + type Value = V; + + fn new(frame_bounds: Self::FrameBounds) -> Self { + Self { + frame_bounds, + _phantom: std::marker::PhantomData, + _phantom2: std::marker::PhantomData, + } + } + + fn preceding_saturated(&self, buffer_ref: BufferRef<'_, Self::Key, Self::Value>) -> bool { + buffer_ref.curr_idx < buffer_ref.buffer.len() && { + let start_off = self.frame_bounds.start.to_offset(); + if let Some(start_off) = start_off { + if start_off >= 0 { + true // pure following frame, always preceding-saturated + } else { + // FIXME(rc): Clippy rule `clippy::nonminimal_bool` is misreporting that + // the following can be simplified. + #[allow(clippy::nonminimal_bool)] + { + assert!(buffer_ref.curr_idx >= buffer_ref.left_idx); + } + buffer_ref.curr_idx - buffer_ref.left_idx >= start_off.unsigned_abs() + } + } else { + false // unbounded frame start, never preceding-saturated + } + } + } + + fn following_saturated(&self, buffer_ref: BufferRef<'_, Self::Key, Self::Value>) -> bool { + buffer_ref.curr_idx < buffer_ref.buffer.len() && { + let end_off = self.frame_bounds.end.to_offset(); + if let Some(end_off) = end_off { + if end_off <= 0 { + true // pure preceding frame, always following-saturated + } else { + // FIXME(rc): Ditto. + #[allow(clippy::nonminimal_bool)] + { + assert!(buffer_ref.right_excl_idx > 0); + assert!(buffer_ref.right_excl_idx > buffer_ref.curr_idx); + assert!(buffer_ref.right_excl_idx <= buffer_ref.buffer.len()); + } + buffer_ref.right_excl_idx - 1 - buffer_ref.curr_idx >= end_off as usize + } + } else { + false // unbounded frame end, never following-saturated + } + } + } + + fn recalculate_left_right(&self, buffer_ref: BufferRefMut<'_, Self::Key, Self::Value>) { + if buffer_ref.buffer.is_empty() { + *buffer_ref.left_idx = 0; + *buffer_ref.right_excl_idx = 0; + } + + let start_off = self.frame_bounds.start.to_offset(); + let end_off = self.frame_bounds.end.to_offset(); + if let Some(start_off) = start_off { + let logical_left_idx = *buffer_ref.curr_idx as isize + start_off; + if logical_left_idx >= 0 { + *buffer_ref.left_idx = + std::cmp::min(logical_left_idx as usize, buffer_ref.buffer.len()); + } else { + *buffer_ref.left_idx = 0; + } + } else { + // unbounded start + *buffer_ref.left_idx = 0; + } + if let Some(end_off) = end_off { + let logical_right_excl_idx = *buffer_ref.curr_idx as isize + end_off + 1; + if logical_right_excl_idx >= 0 { + *buffer_ref.right_excl_idx = + std::cmp::min(logical_right_excl_idx as usize, buffer_ref.buffer.len()); + } else { + *buffer_ref.right_excl_idx = 0; + } + } else { + // unbounded end + *buffer_ref.right_excl_idx = buffer_ref.buffer.len(); + } + } } #[cfg(test)] @@ -265,12 +326,18 @@ mod tests { use itertools::Itertools; use super::*; - use crate::window_function::{Frame, FrameBound}; + use crate::window_function::FrameBound::{ + CurrentRow, Following, Preceding, UnboundedFollowing, UnboundedPreceding, + }; #[test] fn test_rows_frame_unbounded_preceding_to_current_row() { - let mut buffer = WindowBuffer::new( - Frame::rows(FrameBound::UnboundedPreceding, FrameBound::CurrentRow), + let mut buffer = WindowBuffer::>::new( + RowsFrameBounds { + start: UnboundedPreceding, + end: CurrentRow, + }, + FrameExclusion::NoOthers, true, ); @@ -303,8 +370,12 @@ mod tests { #[test] fn test_rows_frame_preceding_to_current_row() { - let mut buffer = WindowBuffer::new( - Frame::rows(FrameBound::Preceding(1), FrameBound::CurrentRow), + let mut buffer = WindowBuffer::>::new( + RowsFrameBounds { + start: Preceding(1), + end: CurrentRow, + }, + FrameExclusion::NoOthers, true, ); @@ -342,8 +413,12 @@ mod tests { #[test] fn test_rows_frame_preceding_to_preceding() { - let mut buffer = WindowBuffer::new( - Frame::rows(FrameBound::Preceding(2), FrameBound::Preceding(1)), + let mut buffer = WindowBuffer::>::new( + RowsFrameBounds { + start: Preceding(2), + end: Preceding(1), + }, + FrameExclusion::NoOthers, true, ); @@ -384,8 +459,12 @@ mod tests { #[test] fn test_rows_frame_current_row_to_unbounded_following() { - let mut buffer = WindowBuffer::new( - Frame::rows(FrameBound::CurrentRow, FrameBound::UnboundedFollowing), + let mut buffer = WindowBuffer::>::new( + RowsFrameBounds { + start: CurrentRow, + end: UnboundedFollowing, + }, + FrameExclusion::NoOthers, true, ); @@ -422,8 +501,12 @@ mod tests { #[test] fn test_rows_frame_current_row_to_following() { - let mut buffer = WindowBuffer::new( - Frame::rows(FrameBound::CurrentRow, FrameBound::Following(1)), + let mut buffer = WindowBuffer::>::new( + RowsFrameBounds { + start: CurrentRow, + end: Following(1), + }, + FrameExclusion::NoOthers, true, ); @@ -468,8 +551,12 @@ mod tests { #[test] fn test_rows_frame_following_to_following() { - let mut buffer = WindowBuffer::new( - Frame::rows(FrameBound::Following(1), FrameBound::Following(2)), + let mut buffer = WindowBuffer::>::new( + RowsFrameBounds { + start: Following(1), + end: Following(2), + }, + FrameExclusion::NoOthers, true, ); @@ -511,12 +598,12 @@ mod tests { #[test] fn test_rows_frame_exclude_current_row() { - let mut buffer = WindowBuffer::new( - Frame::rows_with_exclusion( - FrameBound::UnboundedPreceding, - FrameBound::CurrentRow, - FrameExclusion::CurrentRow, - ), + let mut buffer = WindowBuffer::>::new( + RowsFrameBounds { + start: UnboundedPreceding, + end: CurrentRow, + }, + FrameExclusion::CurrentRow, false, );