diff --git a/src/expr/core/src/window_function/kind.rs b/src/expr/core/src/window_function/kind.rs index 04b320f8ce9f2..25d195c099181 100644 --- a/src/expr/core/src/window_function/kind.rs +++ b/src/expr/core/src/window_function/kind.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use enum_as_inner::EnumAsInner; use parse_display::{Display, FromStr}; use risingwave_common::bail; @@ -19,7 +20,7 @@ use crate::aggregate::AggKind; use crate::Result; /// Kind of window functions. -#[derive(Debug, Display, FromStr, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Display, FromStr, Clone, PartialEq, Eq, Hash, EnumAsInner)] #[display(style = "snake_case")] pub enum WindowFuncKind { // General-purpose window functions. @@ -62,7 +63,7 @@ impl WindowFuncKind { } impl WindowFuncKind { - pub fn is_rank(&self) -> bool { + pub fn is_numbering(&self) -> bool { matches!(self, Self::RowNumber | Self::Rank | Self::DenseRank) } } diff --git a/src/frontend/src/optimizer/plan_node/logical_over_window.rs b/src/frontend/src/optimizer/plan_node/logical_over_window.rs index 7a81b164fbafe..c75104643beac 100644 --- a/src/frontend/src/optimizer/plan_node/logical_over_window.rs +++ b/src/frontend/src/optimizer/plan_node/logical_over_window.rs @@ -278,7 +278,7 @@ impl LogicalOverWindow { let rewritten_selected_items = over_window_builder.rewrite_selected_items(select_exprs)?; for window_func in &window_functions { - if window_func.kind.is_rank() && window_func.order_by.sort_exprs.is_empty() { + if window_func.kind.is_numbering() && window_func.order_by.sort_exprs.is_empty() { return Err(ErrorCode::InvalidInputSyntax(format!( "window rank function without order by: {:?}", window_func diff --git a/src/frontend/src/optimizer/rule/over_window_split_rule.rs b/src/frontend/src/optimizer/rule/over_window_split_rule.rs index b8114f85302ca..85510a5ff9746 100644 --- a/src/frontend/src/optimizer/rule/over_window_split_rule.rs +++ b/src/frontend/src/optimizer/rule/over_window_split_rule.rs @@ -36,7 +36,7 @@ impl Rule for OverWindowSplitRule { .iter() .enumerate() .map(|(idx, func)| { - let func_seq = if func.kind.is_rank() { + let func_seq = if func.kind.is_numbering() { rank_func_seq += 1; rank_func_seq } else { diff --git a/src/frontend/src/optimizer/rule/over_window_to_topn_rule.rs b/src/frontend/src/optimizer/rule/over_window_to_topn_rule.rs index 76fbb649038a6..10ab630a64e3a 100644 --- a/src/frontend/src/optimizer/rule/over_window_to_topn_rule.rs +++ b/src/frontend/src/optimizer/rule/over_window_to_topn_rule.rs @@ -70,7 +70,7 @@ impl Rule for OverWindowToTopNRule { return None; } let window_func = &over_window.window_functions()[0]; - if !window_func.kind.is_rank() { + if !window_func.kind.is_numbering() { // Only rank functions can be converted to TopN. return None; } diff --git a/src/stream/src/executor/over_window/general.rs b/src/stream/src/executor/over_window/general.rs index 1a5073b6e9546..16bc8065f2a00 100644 --- a/src/stream/src/executor/over_window/general.rs +++ b/src/stream/src/executor/over_window/general.rs @@ -152,6 +152,11 @@ pub(super) struct Calls { pub(super) end_is_unbounded: bool, /// Deduplicated indices of all arguments of all calls. pub(super) all_arg_indices: Vec, + + // TODO(rc): The following flags are used to optimize for `row_number`, `rank` and `dense_rank`. + // We should try our best to remove these flags while maintaining the performance in the future. + pub(super) numbering_only: bool, + pub(super) has_rank: bool, } impl Calls { @@ -180,6 +185,9 @@ impl Calls { .dedup() .collect(); + let numbering_only = calls.iter().all(|call| call.kind.is_numbering()); + let has_rank = calls.iter().any(|call| call.kind.is_rank()); + Self { calls, super_rows_frame_bounds, @@ -187,6 +195,8 @@ impl Calls { start_is_unbounded, end_is_unbounded, all_arg_indices, + numbering_only, + has_rank, } } diff --git a/src/stream/src/executor/over_window/over_partition.rs b/src/stream/src/executor/over_window/over_partition.rs index 93f17703f3073..c2f8ea895dccb 100644 --- a/src/stream/src/executor/over_window/over_partition.rs +++ b/src/stream/src/executor/over_window/over_partition.rs @@ -395,8 +395,10 @@ impl<'a, S: StateStore> OverPartition<'a, S> { BTreeMap>, Option>, )> { - let input_schema_len = table.get_data_types().len() - self.calls.len(); let calls = self.calls; + let input_schema_len = table.get_data_types().len() - calls.len(); + let numbering_only = calls.numbering_only; + let has_rank = calls.has_rank; // return values let mut part_changes = BTreeMap::new(); @@ -413,6 +415,7 @@ impl<'a, S: StateStore> OverPartition<'a, S> { let snapshot = part_with_delta.snapshot(); let delta = part_with_delta.delta(); + let last_delta_key = delta.last_key_value().map(|(k, _)| k.as_normal_expect()); // Generate delete changes first, because deletes are skipped during iteration over // `part_with_delta` in the next step. @@ -442,6 +445,8 @@ impl<'a, S: StateStore> OverPartition<'a, S> { assert!(last_curr_key.is_normal()); assert!(last_frame_end.is_normal()); + let last_delta_key = last_delta_key.unwrap(); + if let Some(accessed_range) = accessed_range.as_mut() { let min_start = first_frame_start .as_normal_expect() @@ -504,12 +509,28 @@ impl<'a, S: StateStore> OverPartition<'a, S> { let (key, row) = curr_key_cursor .key_value() .expect("cursor must be valid until `last_curr_key`"); + let mut should_continue = true; + let output = states.slide_no_evict_hint()?; compute_count += 1; let old_output = &row.as_inner()[input_schema_len..]; if !old_output.is_empty() && old_output == output { same_output_count += 1; + + if numbering_only { + if has_rank { + // It's possible that an `Insert` doesn't affect it's ties but affects + // all the following rows, so we need to check the `order_key`. + if key.as_normal_expect().order_key > last_delta_key.order_key { + // there won't be any more changes after this point, we can stop early + should_continue = false; + } + } else if key.as_normal_expect() >= last_delta_key { + // there won't be any more changes after this point, we can stop early + should_continue = false; + } + } } let new_row = OwnedRow::new( @@ -542,7 +563,7 @@ impl<'a, S: StateStore> OverPartition<'a, S> { curr_key_cursor.move_next(); - key != last_curr_key + should_continue && key != last_curr_key } {} }