diff --git a/Cargo.lock b/Cargo.lock index 477cbf02bc3c0..1da55d7e68e4a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7487,6 +7487,7 @@ dependencies = [ "downcast-rs", "easy-ext", "either", + "enum-as-inner", "expect-test", "futures-async-stream", "futures-util", diff --git a/src/batch/src/executor/sort_over_window.rs b/src/batch/src/executor/sort_over_window.rs index c8b6c7ef9388c..21bfc8aa6b177 100644 --- a/src/batch/src/executor/sort_over_window.rs +++ b/src/batch/src/executor/sort_over_window.rs @@ -191,12 +191,11 @@ impl SortOverWindowExecutor { } } for row in rows.drain(..) { - if let Some(chunk) = - chunk_builder.append_one_row(row.chain(OwnedRow::new(states.curr_output()?))) + if let Some(chunk) = chunk_builder + .append_one_row(row.chain(OwnedRow::new(states.slide_no_evict_hint()?))) { yield chunk; } - states.just_slide_forward(); } } } diff --git a/src/expr/core/Cargo.toml b/src/expr/core/Cargo.toml index ada9a3639525c..ab8dd697e220d 100644 --- a/src/expr/core/Cargo.toml +++ b/src/expr/core/Cargo.toml @@ -31,6 +31,7 @@ ctor = "0.2" downcast-rs = "1.2" easy-ext = "1" either = "1" +enum-as-inner = "0.6" futures-async-stream = { workspace = true } futures-util = "0.3" itertools = "0.11" diff --git a/src/expr/core/src/window_function/call.rs b/src/expr/core/src/window_function/call.rs index a74beb672fd4f..ac7c1a2b78dd7 100644 --- a/src/expr/core/src/window_function/call.rs +++ b/src/expr/core/src/window_function/call.rs @@ -15,6 +15,7 @@ use std::cmp::Ordering; use std::fmt::Display; +use enum_as_inner::EnumAsInner; use risingwave_common::bail; use risingwave_common::types::DataType; use risingwave_pb::expr::window_frame::{PbBound, PbExclusion}; @@ -267,7 +268,7 @@ impl FrameBound { } } -#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Default)] +#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, Default, EnumAsInner)] pub enum FrameExclusion { CurrentRow, // Group, diff --git a/src/expr/core/src/window_function/state/aggregate.rs b/src/expr/core/src/window_function/state/aggregate.rs index 7deee85693ef2..38958b50b8c38 100644 --- a/src/expr/core/src/window_function/state/aggregate.rs +++ b/src/expr/core/src/window_function/state/aggregate.rs @@ -15,7 +15,7 @@ use std::collections::BTreeSet; use futures_util::FutureExt; -use risingwave_common::array::{DataChunk, StreamChunk}; +use risingwave_common::array::{DataChunk, Op, StreamChunk}; use risingwave_common::estimate_size::{EstimateSize, KvSize}; use risingwave_common::types::{DataType, Datum}; use risingwave_common::util::iter_util::ZipEqFast; @@ -24,12 +24,16 @@ use smallvec::SmallVec; use super::buffer::WindowBuffer; use super::{StateEvictHint, StateKey, StatePos, WindowState}; -use crate::aggregate::{build_append_only, AggArgs, AggCall, BoxedAggregateFunction}; +use crate::aggregate::{ + AggArgs, AggCall, AggregateFunction, AggregateState as AggImplState, BoxedAggregateFunction, +}; +use crate::sig::FUNCTION_REGISTRY; use crate::window_function::{WindowFuncCall, WindowFuncKind}; use crate::Result; pub struct AggregateState { - agg_call: AggCall, + agg_func: BoxedAggregateFunction, + agg_impl: AggImpl, arg_data_types: Vec, buffer: WindowBuffer>, buffer_heap_size: KvSize, @@ -58,13 +62,54 @@ impl AggregateState { distinct: false, direct_args: vec![], }; + let agg_func_sig = FUNCTION_REGISTRY + .get_aggregate( + agg_kind, + &arg_data_types, + &call.return_type, + false, // means prefer retractable version + ) + .expect("the agg func must exist"); + let agg_func = agg_func_sig.build_aggregate(&agg_call)?; + let (agg_impl, enable_delta) = + if !agg_func_sig.append_only && call.frame.exclusion.is_no_others() { + let init_state = agg_func.create_state(); + (AggImpl::Incremental(init_state), true) + } else { + (AggImpl::Full, false) + }; Ok(Self { - agg_call, + agg_func, + agg_impl, arg_data_types, - buffer: WindowBuffer::new(call.frame.clone()), + buffer: WindowBuffer::new(call.frame.clone(), enable_delta), buffer_heap_size: KvSize::new(), }) } + + fn slide_inner(&mut self) -> StateEvictHint { + let removed_keys: BTreeSet<_> = self + .buffer + .slide() + .map(|(k, v)| { + v.iter().for_each(|arg| { + self.buffer_heap_size.sub_val(arg); + }); + self.buffer_heap_size.sub_val(&k); + k + }) + .collect(); + if removed_keys.is_empty() { + StateEvictHint::CannotEvict( + self.buffer + .smallest_key() + .expect("sliding without removing, must have some entry in the buffer") + .clone(), + ) + } else { + StateEvictHint::CanEvict(removed_keys) + } + } } impl WindowState for AggregateState { @@ -84,36 +129,35 @@ impl WindowState for AggregateState { } } - fn curr_output(&self) -> Result { + fn slide(&mut self) -> Result<(Datum, StateEvictHint)> { let wrapper = AggregatorWrapper { - agg: build_append_only(&self.agg_call)?, + agg_func: self.agg_func.as_ref(), arg_data_types: &self.arg_data_types, }; - wrapper.aggregate(self.buffer.curr_window_values().map(SmallVec::as_slice)) + let output = match self.agg_impl { + AggImpl::Full => wrapper.aggregate(self.buffer.curr_window_values()), + AggImpl::Incremental(ref mut state) => { + wrapper.update(state, self.buffer.consume_curr_window_values_delta()) + } + }?; + let evict_hint = self.slide_inner(); + Ok((output, evict_hint)) } - fn slide_forward(&mut self) -> StateEvictHint { - let removed_keys: BTreeSet<_> = self - .buffer - .slide() - .map(|(k, v)| { - v.iter().for_each(|arg| { - self.buffer_heap_size.sub_val(arg); - }); - self.buffer_heap_size.sub_val(&k); - k - }) - .collect(); - if removed_keys.is_empty() { - StateEvictHint::CannotEvict( - self.buffer - .smallest_key() - .expect("sliding without removing, must have some entry in the buffer") - .clone(), - ) - } else { - StateEvictHint::CanEvict(removed_keys) - } + fn slide_no_output(&mut self) -> Result { + match self.agg_impl { + AggImpl::Full => {} + AggImpl::Incremental(ref mut state) => { + // for incremental agg, we need to update the state even if the caller doesn't need + // the output + let wrapper = AggregatorWrapper { + agg_func: self.agg_func.as_ref(), + arg_data_types: &self.arg_data_types, + }; + wrapper.update(state, self.buffer.consume_curr_window_values_delta())?; + } + }; + Ok(self.slide_inner()) } } @@ -125,41 +169,62 @@ impl EstimateSize for AggregateState { } } +enum AggImpl { + Incremental(AggImplState), + Full, +} + struct AggregatorWrapper<'a> { - agg: BoxedAggregateFunction, + agg_func: &'a dyn AggregateFunction, arg_data_types: &'a [DataType], } impl AggregatorWrapper<'_> { - fn aggregate<'a>(&'a self, values: impl Iterator) -> Result { - // TODO(rc): switch to a better general version of aggregator implementation + fn aggregate(&self, values: impl IntoIterator) -> Result + where + V: AsRef<[Datum]>, + { + let mut state = self.agg_func.create_state(); + self.update( + &mut state, + values.into_iter().map(|args| (Op::Insert, args)), + ) + } + fn update( + &self, + state: &mut AggImplState, + delta: impl IntoIterator, + ) -> Result + where + V: AsRef<[Datum]>, + { let mut args_builders = self .arg_data_types .iter() .map(|data_type| data_type.create_array_builder(0 /* bad! */)) .collect::>(); - let mut n_values = 0; - for value in values { - n_values += 1; - for (builder, datum) in args_builders.iter_mut().zip_eq_fast(value.iter()) { + let mut ops = Vec::new(); + let mut n_rows = 0; + for (op, value) in delta { + n_rows += 1; + ops.push(op); + for (builder, datum) in args_builders.iter_mut().zip_eq_fast(value.as_ref()) { builder.append(datum); } } - let columns = args_builders .into_iter() .map(|builder| builder.finish().into()) .collect::>(); - let chunk = StreamChunk::from(DataChunk::new(columns, n_values)); + let chunk = StreamChunk::from_parts(ops, DataChunk::new(columns, n_rows)); - let mut state = self.agg.create_state(); - self.agg - .update(&mut state, &chunk) + self.agg_func + .update(state, &chunk) .now_or_never() .expect("we don't support UDAF currently, so the function should return immediately")?; - self.agg - .get_result(&state) + self.agg_func + .get_result(state) .now_or_never() .expect("we don't support UDAF currently, so the function should return immediately") } diff --git a/src/expr/core/src/window_function/state/buffer.rs b/src/expr/core/src/window_function/state/buffer.rs index a375c7bfec225..fa684b9049459 100644 --- a/src/expr/core/src/window_function/state/buffer.rs +++ b/src/expr/core/src/window_function/state/buffer.rs @@ -15,7 +15,8 @@ use std::collections::VecDeque; use std::ops::Range; -use either::Either; +use risingwave_common::array::Op; +use smallvec::{smallvec, SmallVec}; use crate::window_function::{Frame, FrameBounds, FrameExclusion}; @@ -26,12 +27,13 @@ struct Entry { // TODO(rc): May be a good idea to extract this into a separate crate. /// A common sliding window buffer. -pub struct WindowBuffer { +pub struct WindowBuffer { frame: Frame, buffer: VecDeque>, curr_idx: usize, left_idx: usize, // inclusive, note this can be > `curr_idx` right_excl_idx: usize, // exclusive, note this can be <= `curr_idx` + curr_delta: Option>, } /// Note: A window frame can be pure preceding, pure following, or acrossing the _current row_. @@ -41,15 +43,24 @@ pub struct CurrWindow<'a, K> { pub following_saturated: bool, } -impl WindowBuffer { - pub fn new(frame: Frame) -> Self { +impl WindowBuffer { + pub fn new(frame: Frame, enable_delta: bool) -> Self { assert!(frame.bounds.is_valid()); + if enable_delta { + // TODO(rc): currently only support `FrameExclusion::NoOthers` for delta + assert!(frame.exclusion.is_no_others()); + } Self { frame, buffer: Default::default(), curr_idx: 0, left_idx: 0, right_excl_idx: 0, + curr_delta: if enable_delta { + Some(Default::default()) + } else { + None + }, } } @@ -64,7 +75,10 @@ impl WindowBuffer { } else { // FIXME(rc): Clippy rule `clippy::nonminimal_bool` is misreporting that // the following can be simplified. - // assert!(self.curr_idx >= self.left_idx); + #[allow(clippy::nonminimal_bool)] + { + assert!(self.curr_idx >= self.left_idx); + } self.curr_idx - self.left_idx >= start_off.unsigned_abs() } } else { @@ -84,9 +98,12 @@ impl WindowBuffer { true // pure preceding frame, always following-saturated } else { // FIXME(rc): Ditto. - // assert!(self.right_excl_idx > 0); - // assert!(self.right_excl_idx > self.curr_idx); - // assert!(self.right_excl_idx <= self.buffer.len()); + #[allow(clippy::nonminimal_bool)] + { + assert!(self.right_excl_idx > 0); + assert!(self.right_excl_idx > self.curr_idx); + assert!(self.right_excl_idx <= self.buffer.len()); + } self.right_excl_idx - 1 - self.curr_idx >= end_off as usize } } else { @@ -110,28 +127,43 @@ impl WindowBuffer { } } + fn curr_window_outer(&self) -> Range { + self.left_idx..self.right_excl_idx + } + + fn curr_window_exclusion(&self) -> Range { + // TODO(rc): should intersect with `curr_window_outer` to be more accurate + match self.frame.exclusion { + FrameExclusion::CurrentRow => self.curr_idx..self.curr_idx + 1, + FrameExclusion::NoOthers => self.curr_idx..self.curr_idx, + } + } + + fn curr_window_ranges(&self) -> (Range, Range) { + let selection = self.curr_window_outer(); + let exclusion = self.curr_window_exclusion(); + range_except(selection, exclusion) + } + /// Iterate over values in the current window. pub fn curr_window_values(&self) -> impl Iterator { assert!(self.left_idx <= self.right_excl_idx); assert!(self.right_excl_idx <= self.buffer.len()); - let selection = self.left_idx..self.right_excl_idx; - if selection.is_empty() { - return Either::Left(std::iter::empty()); - } + let (left, right) = self.curr_window_ranges(); + self.buffer + .range(left) + .chain(self.buffer.range(right)) + .map(|Entry { value, .. }| value) + } - let exclusion = match self.frame.exclusion { - FrameExclusion::CurrentRow => self.curr_idx..self.curr_idx + 1, - FrameExclusion::NoOthers => self.curr_idx..self.curr_idx, - }; - let (left, right) = range_except(selection, exclusion); - - Either::Right( - self.buffer - .range(left) - .chain(self.buffer.range(right)) - .map(|Entry { value, .. }| value), - ) + /// Consume the delta of values comparing the current window to the previous window. + /// The delta is not guaranteed to be sorted, especially when frame exclusion is not `NoOthers`. + pub fn consume_curr_window_values_delta(&mut self) -> impl Iterator + '_ { + self.curr_delta + .as_mut() + .expect("delta mode should be enabled") + .drain(..) } fn recalculate_left_right(&mut self) { @@ -175,10 +207,29 @@ impl WindowBuffer { } } + fn maintain_delta(&mut self, old_outer: Range, new_outer: Range) { + debug_assert!(self.frame.exclusion.is_no_others()); + + let (outer_removed, outer_added) = range_diff(old_outer.clone(), new_outer.clone()); + let delta = self.curr_delta.as_mut().unwrap(); + for idx in outer_removed.iter().cloned().flatten() { + delta.push((Op::Delete, self.buffer[idx].value.clone())); + } + for idx in outer_added.iter().cloned().flatten() { + delta.push((Op::Insert, self.buffer[idx].value.clone())); + } + } + /// Append a key-value pair to the buffer. pub fn append(&mut self, key: K, value: V) { + let old_outer = self.curr_window_outer(); + self.buffer.push_back(Entry { key, value }); - self.recalculate_left_right() + self.recalculate_left_right(); + + if self.curr_delta.is_some() { + self.maintain_delta(old_outer, self.curr_window_outer()); + } } /// Get the smallest key that is still kept in the buffer. @@ -190,8 +241,15 @@ impl WindowBuffer { /// Slide the current window forward. /// Returns the keys that are removed from the buffer. pub fn slide(&mut self) -> impl Iterator + '_ { + let old_outer = self.curr_window_outer(); + self.curr_idx += 1; self.recalculate_left_right(); + + if self.curr_delta.is_some() { + self.maintain_delta(old_outer, self.curr_window_outer()); + } + let min_needed_idx = std::cmp::min(self.left_idx, self.curr_idx); self.curr_idx -= min_needed_idx; self.left_idx -= min_needed_idx; @@ -205,7 +263,12 @@ impl WindowBuffer { /// Calculate range (A - B), the result might be the union of two ranges when B is totally included /// in the A. fn range_except(a: Range, b: Range) -> (Range, Range) { - if a.end <= b.start || b.end <= a.start { + #[allow(clippy::if_same_then_else)] // for better readability + if a.is_empty() { + (0..0, 0..0) + } else if b.is_empty() { + (a, 0..0) + } else if a.end <= b.start || b.end <= a.start { // a: [ ) // b: [ ) // or @@ -233,31 +296,129 @@ fn range_except(a: Range, b: Range) -> (Range, Range } } +/// Calculate the difference of two ranges A and B, return (removed ranges, added ranges). +/// Note this is quite different from [`range_except`]. +#[allow(clippy::type_complexity)] // looks complex but it's not +fn range_diff( + a: Range, + b: Range, +) -> (SmallVec<[Range; 2]>, SmallVec<[Range; 2]>) { + if a.start == b.start { + match a.end.cmp(&b.end) { + std::cmp::Ordering::Equal => { + // a: [ ) + // b: [ ) + (smallvec![], smallvec![]) + } + std::cmp::Ordering::Less => { + // a: [ ) + // b: [ ) + (smallvec![], smallvec![a.end..b.end]) + } + std::cmp::Ordering::Greater => { + // a: [ ) + // b: [ ) + (smallvec![b.end..a.end], smallvec![]) + } + } + } else if a.end == b.end { + debug_assert!(a.start != b.start); + if a.start < b.start { + // a: [ ) + // b: [ ) + (smallvec![a.start..b.start], smallvec![]) + } else { + // a: [ ) + // b: [ ) + (smallvec![], smallvec![b.start..a.start]) + } + } else { + debug_assert!(a.start != b.start && a.end != b.end); + if a.end <= b.start || b.end <= a.start { + // a: [ ) + // b: [ [ ) + // or + // a: [ ) + // b: [ ) ) + (smallvec![a], smallvec![b]) + } else if b.start < a.start && a.end < b.end { + // a: [ ) + // b: [ ) + (smallvec![], smallvec![b.start..a.start, a.end..b.end]) + } else if a.start < b.start && b.end < a.end { + // a: [ ) + // b: [ ) + (smallvec![a.start..b.start, b.end..a.end], smallvec![]) + } else if a.end < b.end { + // a: [ ) + // b: [ ) + (smallvec![a.start..b.start], smallvec![a.end..b.end]) + } else { + // a: [ ) + // b: [ ) + (smallvec![b.end..a.end], smallvec![b.start..a.start]) + } + } +} + #[cfg(test)] mod tests { + use std::collections::HashSet; + use itertools::Itertools; use super::*; use crate::window_function::{Frame, FrameBound}; + #[test] + fn test_range_diff() { + fn test( + a: Range, + b: Range, + expected_removed: impl IntoIterator, + expected_added: impl IntoIterator, + ) { + let (removed, added) = range_diff(a, b); + let removed_set = removed.into_iter().flatten().collect::>(); + let added_set = added.into_iter().flatten().collect::>(); + let expected_removed_set = expected_removed.into_iter().collect::>(); + let expected_added_set = expected_added.into_iter().collect::>(); + assert_eq!(removed_set, expected_removed_set); + assert_eq!(added_set, expected_added_set); + } + + test(0..0, 0..0, [], []); + test(0..1, 0..1, [], []); + test(0..1, 0..2, [], [1]); + test(0..2, 0..1, [1], []); + test(0..2, 1..2, [0], []); + test(1..2, 0..2, [], [0]); + test(0..1, 1..2, [0], [1]); + test(0..1, 2..3, [0], [2]); + test(1..2, 0..1, [1], [0]); + test(2..3, 0..1, [2], [0]); + test(0..3, 1..2, [0, 2], []); + test(1..2, 0..3, [], [0, 2]); + test(0..3, 2..4, [0, 1], [3]); + test(2..4, 0..3, [3], [0, 1]); + } + #[test] fn test_rows_frame_unbounded_preceding_to_current_row() { - let mut buffer = WindowBuffer::new(Frame::rows( - FrameBound::UnboundedPreceding, - FrameBound::CurrentRow, - )); + let mut buffer = WindowBuffer::new( + Frame::rows(FrameBound::UnboundedPreceding, FrameBound::CurrentRow), + true, + ); let window = buffer.curr_window(); assert!(window.key.is_none()); assert!(!window.preceding_saturated); assert!(!window.following_saturated); - buffer.append(1, "hello"); let window = buffer.curr_window(); assert_eq!(window.key, Some(&1)); assert!(!window.preceding_saturated); // unbounded preceding is never saturated assert!(window.following_saturated); - buffer.append(2, "world"); let window = buffer.curr_window(); assert_eq!(window.key, Some(&1)); @@ -267,7 +428,6 @@ mod tests { buffer.curr_window_values().cloned().collect_vec(), vec!["hello"] ); - let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec(); assert!(removed_keys.is_empty()); // unbouded preceding, nothing can ever be removed let window = buffer.curr_window(); @@ -279,16 +439,15 @@ mod tests { #[test] fn test_rows_frame_preceding_to_current_row() { - let mut buffer = WindowBuffer::new(Frame::rows( - FrameBound::Preceding(1), - FrameBound::CurrentRow, - )); + let mut buffer = WindowBuffer::new( + Frame::rows(FrameBound::Preceding(1), FrameBound::CurrentRow), + true, + ); let window = buffer.curr_window(); assert!(window.key.is_none()); assert!(!window.preceding_saturated); assert!(!window.following_saturated); - buffer.append(1, "hello"); let window = buffer.curr_window(); assert_eq!(window.key, Some(&1)); @@ -298,13 +457,11 @@ mod tests { buffer.curr_window_values().cloned().collect_vec(), vec!["hello"] ); - buffer.append(2, "world"); let window = buffer.curr_window(); assert_eq!(window.key, Some(&1)); assert!(!window.preceding_saturated); assert!(window.following_saturated); - let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec(); assert!(removed_keys.is_empty()); let window = buffer.curr_window(); @@ -312,7 +469,6 @@ mod tests { assert!(window.preceding_saturated); assert!(window.following_saturated); assert_eq!(buffer.smallest_key(), Some(&1)); - buffer.append(3, "!"); let window = buffer.curr_window(); assert_eq!(window.key, Some(&2)); @@ -322,10 +478,10 @@ mod tests { #[test] fn test_rows_frame_preceding_to_preceding() { - let mut buffer = WindowBuffer::new(Frame::rows( - FrameBound::Preceding(2), - FrameBound::Preceding(1), - )); + let mut buffer = WindowBuffer::new( + Frame::rows(FrameBound::Preceding(2), FrameBound::Preceding(1)), + true, + ); buffer.append(1, "RisingWave"); let window = buffer.curr_window(); @@ -333,11 +489,9 @@ mod tests { assert!(!window.preceding_saturated); assert!(window.following_saturated); assert!(buffer.curr_window_values().collect_vec().is_empty()); - let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec(); assert!(removed_keys.is_empty()); assert_eq!(buffer.smallest_key(), Some(&1)); - buffer.append(2, "is the best"); let window = buffer.curr_window(); assert_eq!(window.key, Some(&2)); @@ -347,11 +501,9 @@ mod tests { buffer.curr_window_values().cloned().collect_vec(), vec!["RisingWave"] ); - let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec(); assert!(removed_keys.is_empty()); assert_eq!(buffer.smallest_key(), Some(&1)); - buffer.append(3, "streaming platform"); let window = buffer.curr_window(); assert_eq!(window.key, Some(&3)); @@ -361,7 +513,6 @@ mod tests { buffer.curr_window_values().cloned().collect_vec(), vec!["RisingWave", "is the best"] ); - let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec(); assert_eq!(removed_keys, vec![1]); assert_eq!(buffer.smallest_key(), Some(&2)); @@ -369,10 +520,10 @@ mod tests { #[test] fn test_rows_frame_current_row_to_unbounded_following() { - let mut buffer = WindowBuffer::new(Frame::rows( - FrameBound::CurrentRow, - FrameBound::UnboundedFollowing, - )); + let mut buffer = WindowBuffer::new( + Frame::rows(FrameBound::CurrentRow, FrameBound::UnboundedFollowing), + true, + ); buffer.append(1, "RisingWave"); let window = buffer.curr_window(); @@ -383,7 +534,6 @@ mod tests { buffer.curr_window_values().cloned().collect_vec(), vec!["RisingWave"] ); - buffer.append(2, "is the best"); let window = buffer.curr_window(); assert_eq!(window.key, Some(&1)); @@ -393,7 +543,6 @@ mod tests { buffer.curr_window_values().cloned().collect_vec(), vec!["RisingWave", "is the best"] ); - let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec(); assert_eq!(removed_keys, vec![1]); assert_eq!(buffer.smallest_key(), Some(&2)); @@ -409,10 +558,10 @@ mod tests { #[test] fn test_rows_frame_current_row_to_following() { - let mut buffer = WindowBuffer::new(Frame::rows( - FrameBound::CurrentRow, - FrameBound::Following(1), - )); + let mut buffer = WindowBuffer::new( + Frame::rows(FrameBound::CurrentRow, FrameBound::Following(1)), + true, + ); buffer.append(1, "RisingWave"); let window = buffer.curr_window(); @@ -423,7 +572,6 @@ mod tests { buffer.curr_window_values().cloned().collect_vec(), vec!["RisingWave"] ); - buffer.append(2, "is the best"); let window = buffer.curr_window(); assert_eq!(window.key, Some(&1)); @@ -433,7 +581,6 @@ mod tests { buffer.curr_window_values().cloned().collect_vec(), vec!["RisingWave", "is the best"] ); - buffer.append(3, "streaming platform"); let window = buffer.curr_window(); assert_eq!(window.key, Some(&1)); @@ -443,7 +590,6 @@ mod tests { buffer.curr_window_values().cloned().collect_vec(), vec!["RisingWave", "is the best"] ); - let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec(); assert_eq!(removed_keys, vec![1]); let window = buffer.curr_window(); @@ -458,10 +604,10 @@ mod tests { #[test] fn test_rows_frame_following_to_following() { - let mut buffer = WindowBuffer::new(Frame::rows( - FrameBound::Following(1), - FrameBound::Following(2), - )); + let mut buffer = WindowBuffer::new( + Frame::rows(FrameBound::Following(1), FrameBound::Following(2)), + true, + ); buffer.append(1, "RisingWave"); let window = buffer.curr_window(); @@ -469,7 +615,6 @@ mod tests { assert!(window.preceding_saturated); assert!(!window.following_saturated); assert!(buffer.curr_window_values().collect_vec().is_empty()); - buffer.append(2, "is the best"); let window = buffer.curr_window(); assert_eq!(window.key, Some(&1)); @@ -479,7 +624,6 @@ mod tests { buffer.curr_window_values().cloned().collect_vec(), vec!["is the best"] ); - buffer.append(3, "streaming platform"); let window = buffer.curr_window(); assert_eq!(window.key, Some(&1)); @@ -489,7 +633,6 @@ mod tests { buffer.curr_window_values().cloned().collect_vec(), vec!["is the best", "streaming platform"] ); - let removed_keys = buffer.slide().map(|(k, _)| k).collect_vec(); assert_eq!(removed_keys, vec![1]); let window = buffer.curr_window(); @@ -504,11 +647,14 @@ mod tests { #[test] fn test_rows_frame_exclude_current_row() { - let mut buffer = WindowBuffer::new(Frame::rows_with_exclusion( - FrameBound::UnboundedPreceding, - FrameBound::CurrentRow, - FrameExclusion::CurrentRow, - )); + let mut buffer = WindowBuffer::new( + Frame::rows_with_exclusion( + FrameBound::UnboundedPreceding, + FrameBound::CurrentRow, + FrameExclusion::CurrentRow, + ), + false, + ); buffer.append(1, "hello"); assert!(buffer @@ -516,7 +662,6 @@ mod tests { .cloned() .collect_vec() .is_empty()); - buffer.append(2, "world"); let _ = buffer.slide(); assert_eq!( diff --git a/src/expr/core/src/window_function/state/mod.rs b/src/expr/core/src/window_function/state/mod.rs index 971fb97f66cdc..927f5aaf6e0c0 100644 --- a/src/expr/core/src/window_function/state/mod.rs +++ b/src/expr/core/src/window_function/state/mod.rs @@ -101,11 +101,11 @@ pub trait WindowState: EstimateSize { /// Get the current window frame position. fn curr_window(&self) -> StatePos<'_>; - /// Get the window function result of current window frame. - fn curr_output(&self) -> Result; + /// Slide the window frame forward and collect the output and evict hint. Similar to `Iterator::next`. + fn slide(&mut self) -> Result<(Datum, StateEvictHint)>; - /// Slide the window frame forward. - fn slide_forward(&mut self) -> StateEvictHint; + /// Slide the window frame forward and collect the evict hint. Don't calculate the output if possible. + fn slide_no_output(&mut self) -> Result; } pub fn create_window_state(call: &WindowFuncCall) -> Result> { diff --git a/src/expr/core/src/window_function/state/row_number.rs b/src/expr/core/src/window_function/state/row_number.rs index fd485292c9382..6a2759d69308c 100644 --- a/src/expr/core/src/window_function/state/row_number.rs +++ b/src/expr/core/src/window_function/state/row_number.rs @@ -36,6 +36,19 @@ impl RowNumberState { curr_row_number: 1, } } + + fn slide_inner(&mut self) -> StateEvictHint { + self.curr_row_number += 1; + self.buffer + .pop_front() + .expect("should not slide forward when the current window is not ready"); + // can't evict any state key in EOWC mode, because we can't recover from previous output now + StateEvictHint::CannotEvict( + self.first_key + .clone() + .expect("should have appended some rows"), + ) + } } impl WindowState for RowNumberState { @@ -54,25 +67,18 @@ impl WindowState for RowNumberState { } } - fn curr_output(&self) -> Result { - if self.curr_window().is_ready { - Ok(Some(self.curr_row_number.into())) + fn slide(&mut self) -> Result<(Datum, StateEvictHint)> { + let output = if self.curr_window().is_ready { + Some(self.curr_row_number.into()) } else { - Ok(None) - } + None + }; + let evict_hint = self.slide_inner(); + Ok((output, evict_hint)) } - fn slide_forward(&mut self) -> StateEvictHint { - self.curr_row_number += 1; - self.buffer - .pop_front() - .expect("should not slide forward when the current window is not ready"); - // can't evict any state key in EOWC mode, because we can't recover from previous output now - StateEvictHint::CannotEvict( - self.first_key - .clone() - .expect("should have appended some rows"), - ) + fn slide_no_output(&mut self) -> Result { + Ok(self.slide_inner()) } } @@ -92,6 +98,24 @@ mod tests { } } + #[test] + #[should_panic(expected = "should not slide forward when the current window is not ready")] + fn test_row_number_state_bad_use() { + let call = WindowFuncCall { + kind: WindowFuncKind::RowNumber, + args: AggArgs::None, + return_type: DataType::Int64, + frame: Frame::rows( + FrameBound::UnboundedPreceding, + FrameBound::UnboundedFollowing, + ), + }; + let mut state = RowNumberState::new(&call); + assert!(state.curr_window().key.is_none()); + assert!(!state.curr_window().is_ready); + _ = state.slide() + } + #[test] fn test_row_number_state() { let call = WindowFuncCall { @@ -106,24 +130,23 @@ mod tests { let mut state = RowNumberState::new(&call); assert!(state.curr_window().key.is_none()); assert!(!state.curr_window().is_ready); - assert!(state.curr_output().unwrap().is_none()); state.append(create_state_key(100), SmallVec::new()); assert_eq!(state.curr_window().key.unwrap(), &create_state_key(100)); assert!(state.curr_window().is_ready); - assert_eq!(state.curr_output().unwrap().unwrap(), 1i64.into()); - state.append(create_state_key(103), SmallVec::new()); - state.append(create_state_key(102), SmallVec::new()); - assert_eq!(state.curr_window().key.unwrap(), &create_state_key(100)); - let evict_hint = state.slide_forward(); + let (output, evict_hint) = state.slide().unwrap(); + assert_eq!(output.unwrap(), 1i64.into()); match evict_hint { StateEvictHint::CannotEvict(state_key) => { assert_eq!(state_key, create_state_key(100)); } _ => unreachable!(), } + assert!(!state.curr_window().is_ready); + state.append(create_state_key(103), SmallVec::new()); + state.append(create_state_key(102), SmallVec::new()); assert_eq!(state.curr_window().key.unwrap(), &create_state_key(103)); - assert_eq!(state.curr_output().unwrap().unwrap(), 2i64.into()); - let evict_hint = state.slide_forward(); + let (output, evict_hint) = state.slide().unwrap(); + assert_eq!(output.unwrap(), 2i64.into()); match evict_hint { StateEvictHint::CannotEvict(state_key) => { assert_eq!(state_key, create_state_key(100)); @@ -131,6 +154,7 @@ mod tests { _ => unreachable!(), } assert_eq!(state.curr_window().key.unwrap(), &create_state_key(102)); - assert_eq!(state.curr_output().unwrap().unwrap(), 3i64.into()); + let (output, _) = state.slide().unwrap(); + assert_eq!(output.unwrap(), 3i64.into()); } } diff --git a/src/expr/core/src/window_function/states.rs b/src/expr/core/src/window_function/states.rs index d039eb323e101..3a506d165c356 100644 --- a/src/expr/core/src/window_function/states.rs +++ b/src/expr/core/src/window_function/states.rs @@ -48,28 +48,43 @@ impl WindowStates { self.0.iter().all(|state| state.curr_window().is_ready) } - /// Get the current output of all windows. - pub fn curr_output(&self) -> Result> { + /// Slide all windows forward and collect the output and evict hints. + pub fn slide(&mut self) -> Result<(Vec, StateEvictHint)> { debug_assert!(self.are_aligned()); - self.0.iter().map(|state| state.curr_output()).try_collect() + let mut output = Vec::with_capacity(self.0.len()); + let mut evict_hint: Option = None; + for state in &mut self.0 { + let (x_output, x_evict) = state.slide()?; + output.push(x_output); + evict_hint = match evict_hint { + Some(evict_hint) => Some(evict_hint.merge(x_evict)), + None => Some(x_evict), + }; + } + Ok(( + output, + evict_hint.expect("# of evict hints = # of window states"), + )) } - /// Slide all windows forward and collect the evict hints. - pub fn slide_forward(&mut self) -> StateEvictHint { + /// Slide all windows forward and collect the output, ignoring the evict hints. + pub fn slide_no_evict_hint(&mut self) -> Result> { debug_assert!(self.are_aligned()); - self.0 - .iter_mut() - .map(|state| state.slide_forward()) - .reduce(StateEvictHint::merge) - .expect("# of evict hints = # of window states") + let mut output = Vec::with_capacity(self.0.len()); + for state in &mut self.0 { + let (x_output, _) = state.slide()?; + output.push(x_output); + } + Ok(output) } - /// Slide all windows forward, ignoring the evict hints. - pub fn just_slide_forward(&mut self) { + /// Slide all windows forward, ignoring the output and evict hints. + pub fn just_slide(&mut self) -> Result<()> { debug_assert!(self.are_aligned()); - self.0 - .iter_mut() - .for_each(|state| _ = state.slide_forward()); + for state in &mut self.0 { + state.slide_no_output()?; + } + Ok(()) } } diff --git a/src/stream/src/executor/over_window/eowc.rs b/src/stream/src/executor/over_window/eowc.rs index b5da45edd47e5..fa20e3b49d970 100644 --- a/src/stream/src/executor/over_window/eowc.rs +++ b/src/stream/src/executor/over_window/eowc.rs @@ -241,7 +241,7 @@ impl EowcOverWindowExecutor { // Ignore ready windows (all ready windows were outputted before). while partition.states.are_ready() { - partition.states.just_slide_forward(); + partition.states.just_slide()?; partition.curr_row_buffer.pop_front(); } @@ -276,7 +276,8 @@ impl EowcOverWindowExecutor { &encoded_partition_key, ) .await?; - let mut partition = vars.partitions.get_mut(&encoded_partition_key).unwrap(); + let partition: &mut Partition = + &mut vars.partitions.get_mut(&encoded_partition_key).unwrap(); // Materialize input to state table. this.state_table.insert(input_row); @@ -314,7 +315,7 @@ impl EowcOverWindowExecutor { // The partition is ready to output, so we can produce a row. // Get all outputs. - let ret_values = partition.states.curr_output()?; + let (ret_values, evict_hint) = partition.states.slide()?; let curr_row = partition .curr_row_buffer .pop_front() @@ -330,7 +331,6 @@ impl EowcOverWindowExecutor { } // Evict unneeded rows from state table. - let evict_hint = partition.states.slide_forward(); if let StateEvictHint::CanEvict(keys_to_evict) = evict_hint { for key in keys_to_evict { let order_key = memcmp_encoding::decode_row( diff --git a/src/stream/src/executor/over_window/general.rs b/src/stream/src/executor/over_window/general.rs index c9717f9defe61..38c959039bf56 100644 --- a/src/stream/src/executor/over_window/general.rs +++ b/src/stream/src/executor/over_window/general.rs @@ -512,7 +512,7 @@ impl OverWindowExecutor { // Slide to the first affected key. We can safely compare to `Some(first_curr_key)` here // because it must exist in the states, by the definition of affected range. while states.curr_key() != Some(first_curr_key.as_normal_expect()) { - states.just_slide_forward(); + states.just_slide()?; } let mut curr_key_cursor = part_with_delta.find(first_curr_key).unwrap(); assert_eq!( @@ -525,7 +525,7 @@ impl OverWindowExecutor { let (key, row) = curr_key_cursor .key_value() .expect("cursor must be valid until `last_curr_key`"); - let output = states.curr_output()?; + let output = states.slide_no_evict_hint()?; let new_row = OwnedRow::new( row.as_inner() .iter() @@ -554,7 +554,6 @@ impl OverWindowExecutor { } } - states.just_slide_forward(); curr_key_cursor.move_next(); key != last_curr_key diff --git a/src/stream/tests/integration_tests/eowc_over_window.rs b/src/stream/tests/integration_tests/eowc_over_window.rs index 9407b6013dc03..7334654d8dd50 100644 --- a/src/stream/tests/integration_tests/eowc_over_window.rs +++ b/src/stream/tests/integration_tests/eowc_over_window.rs @@ -186,13 +186,17 @@ async fn test_over_window_aggregate() { check_with_script( || create_executor(calls.clone(), store.clone()), r###" -- !barrier 1 -- !chunk |2 - I T I i - + 1 p1 100 10 - + 1 p1 101 16 - + 4 p1 102 20 -"###, + - !barrier 1 + - !chunk |2 + I T I i + + 1 p1 100 10 + + 1 p1 101 16 + + 4 p1 102 20 + - !chunk |2 + I T I i + + 2 p1 103 30 + + 6 p1 104 11 + "###, expect![[r#" - input: !barrier 1 output: @@ -209,6 +213,17 @@ async fn test_over_window_aggregate() { | + | 1 | p1 | 100 | 10 | 26 | | + | 1 | p1 | 101 | 16 | 46 | +---+---+----+-----+----+----+ + - input: !chunk |- + +---+---+----+-----+----+ + | + | 2 | p1 | 103 | 30 | + | + | 6 | p1 | 104 | 11 | + +---+---+----+-----+----+ + output: + - !chunk |- + +---+---+----+-----+----+----+ + | + | 4 | p1 | 102 | 20 | 66 | + | + | 2 | p1 | 103 | 30 | 61 | + +---+---+----+-----+----+----+ "#]], SnapshotOptions::default(), )