diff --git a/src/stream/src/executor/over_window/over_partition.rs b/src/stream/src/executor/over_window/over_partition.rs index f0c230103408f..16116fd37e743 100644 --- a/src/stream/src/executor/over_window/over_partition.rs +++ b/src/stream/src/executor/over_window/over_partition.rs @@ -822,14 +822,20 @@ fn find_affected_ranges<'cache>( .iter() .any(|call| call.frame.bounds.end_is_unbounded()); + // NOTE: Don't be too clever! Here we must calculate `first_frame_start` after calculating + // `first_curr_key`, because the correct calculation of `first_frame_start` depends on + // `first_curr_key` which is the MINIMUM of all `first_curr_key`s of all frames of all window + // function calls. + let first_curr_key = if end_is_unbounded { // If the frame end is unbounded, the frame corresponding to the first key is always // affected. first_key } else { - calls - .iter() - .map(|call| match &call.frame.bounds { + let mut min_first_curr_key = &Sentinelled::Largest; + + 'outer_loop: for call in calls { + let key = match &call.frame.bounds { FrameBounds::Rows(bounds) => { let mut cursor = part_with_delta .lower_bound(Bound::Included(delta.first_key_value().unwrap().0)); @@ -838,14 +844,26 @@ fn find_affected_ranges<'cache>( // cursor is at ghost position at first. cursor.move_prev(); if cursor.position().is_ghost() { - break; + min_first_curr_key = first_key; + break 'outer_loop; } } cursor.key().unwrap_or(first_key) } - }) - .min() - .expect("# of window function calls > 0") + }; + min_first_curr_key = min_first_curr_key.min(key); + if min_first_curr_key == first_key { + // if we already pushed the affected curr key to the first key, no more pushing is needed + break 'outer_loop; + } + } + + assert!( + !min_first_curr_key.is_largest(), + "# of window function calls > 0, meaning this must be mutated" + ); + + min_first_curr_key }; let first_frame_start = if start_is_unbounded { @@ -853,65 +871,104 @@ fn find_affected_ranges<'cache>( // range. first_key } else { - calls - .iter() - .map(|call| match &call.frame.bounds { + let mut min_frame_start = &Sentinelled::Largest; + + 'outer_loop: for call in calls { + let key = match &call.frame.bounds { FrameBounds::Rows(bounds) => { let mut cursor = part_with_delta.find(first_curr_key).unwrap(); for _ in 0..bounds.start.n_preceding_rows().unwrap() { cursor.move_prev(); if cursor.position().is_ghost() { - break; + min_frame_start = first_key; + break 'outer_loop; } } cursor.key().unwrap_or(first_key) } - }) - .min() - .expect("# of window function calls > 0") + }; + min_frame_start = min_frame_start.min(key); + if min_frame_start == first_key { + // if we already pushed the affected frame start to the first key, no more pushing is needed + break 'outer_loop; + } + } + + assert!( + !min_frame_start.is_largest(), + "# of window function calls > 0, meaning this must be mutated" + ); + + min_frame_start }; let last_curr_key = if start_is_unbounded { last_key } else { - calls - .iter() - .map(|call| match &call.frame.bounds { + let mut max_last_curr_key = &Sentinelled::Smallest; + + 'outer_loop: for call in calls { + let key = match &call.frame.bounds { FrameBounds::Rows(bounds) => { let mut cursor = part_with_delta .upper_bound(Bound::Included(delta.last_key_value().unwrap().0)); for _ in 0..bounds.start.n_preceding_rows().unwrap() { cursor.move_next(); if cursor.position().is_ghost() { - break; + max_last_curr_key = last_key; + break 'outer_loop; } } cursor.key().unwrap_or(last_key) } - }) - .max() - .expect("# of window function calls > 0") + }; + max_last_curr_key = max_last_curr_key.max(key); + if max_last_curr_key == last_key { + // if we already pushed the affected curr key to the last key, no more pushing is needed + break 'outer_loop; + } + } + + assert!( + !max_last_curr_key.is_smallest(), + "# of window function calls > 0, meaning this must be mutated" + ); + + max_last_curr_key }; let last_frame_end = if end_is_unbounded { last_key } else { - calls - .iter() - .map(|call| match &call.frame.bounds { + let mut max_frame_end = &Sentinelled::Smallest; + + 'outer_loop: for call in calls { + let key = match &call.frame.bounds { FrameBounds::Rows(bounds) => { let mut cursor = part_with_delta.find(last_curr_key).unwrap(); for _ in 0..bounds.end.n_following_rows().unwrap() { cursor.move_next(); if cursor.position().is_ghost() { - break; + max_frame_end = last_key; + break 'outer_loop; } } cursor.key().unwrap_or(last_key) } - }) - .max() - .expect("# of window function calls > 0") + }; + max_frame_end = max_frame_end.max(key); + if max_frame_end == last_key { + // if we already pushed the affected frame end to the last key, no more pushing is needed + break 'outer_loop; + } + } + + assert!( + !max_frame_end.is_smallest(), + "# of window function calls > 0, meaning this must be mutated" + ); + + max_frame_end }; if first_curr_key > last_curr_key {