From 16107fde0f6ee184546d2510bffc460625f34a14 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Mon, 28 Oct 2024 17:05:34 +0800 Subject: [PATCH] perf(over window): avoid recompute information about window frames (#19036) Signed-off-by: Richard Chien --- .../src/executor/over_window/frame_finder.rs | 21 +++--- .../src/executor/over_window/general.rs | 73 ++++++++++++++++--- .../executor/over_window/over_partition.rs | 73 ++++++------------- 3 files changed, 96 insertions(+), 71 deletions(-) diff --git a/src/stream/src/executor/over_window/frame_finder.rs b/src/stream/src/executor/over_window/frame_finder.rs index 12e9f21842887..4438611b5d370 100644 --- a/src/stream/src/executor/over_window/frame_finder.rs +++ b/src/stream/src/executor/over_window/frame_finder.rs @@ -150,7 +150,7 @@ pub(super) fn find_frame_end_for_rows_frame<'cache>( /// Given the first and last key in delta, calculate the order values of the first /// and the last frames logically affected by some `RANGE` frames. pub(super) fn calc_logical_curr_for_range_frames( - range_frames: &[&RangeFrameBounds], + range_frames: &[RangeFrameBounds], delta_first_key: &StateKey, delta_last_key: &StateKey, ) -> Option<(Sentinelled, Sentinelled)> { @@ -167,7 +167,7 @@ pub(super) fn calc_logical_curr_for_range_frames( /// values of the logical start row of the first frame and the logical end row of the /// last frame. pub(super) fn calc_logical_boundary_for_range_frames( - range_frames: &[&RangeFrameBounds], + range_frames: &[RangeFrameBounds], first_curr_key: &StateKey, last_curr_key: &StateKey, ) -> Option<(Sentinelled, Sentinelled)> { @@ -184,7 +184,7 @@ pub(super) fn calc_logical_boundary_for_range_frames( /// find the most closed cache key in `part_with_delta`. Ideally this function returns /// the smallest key that is larger than or equal to the given logical order (using `lower_bound`). pub(super) fn find_left_for_range_frames<'cache>( - range_frames: &[&RangeFrameBounds], + range_frames: &[RangeFrameBounds], part_with_delta: DeltaBTreeMap<'cache, CacheKey, OwnedRow>, logical_order_value: impl ToDatumRef, cache_key_pk_len: usize, // this is dirty but we have no better choice @@ -201,7 +201,7 @@ pub(super) fn find_left_for_range_frames<'cache>( /// find the most closed cache key in `part_with_delta`. Ideally this function returns /// the largest key that is smaller than or equal to the given logical order (using `lower_bound`). pub(super) fn find_right_for_range_frames<'cache>( - range_frames: &[&RangeFrameBounds], + range_frames: &[RangeFrameBounds], part_with_delta: DeltaBTreeMap<'cache, CacheKey, OwnedRow>, logical_order_value: impl ToDatumRef, cache_key_pk_len: usize, // this is dirty but we have no better choice @@ -391,7 +391,7 @@ fn find_boundary_for_rows_frame<'cache, const LEFT: bool>( /// repeating. Check [`calc_logical_curr_for_range_frames`] and [`calc_logical_boundary_for_range_frames`] /// if you cannot understand the purpose of this function. fn calc_logical_ord_for_range_frames( - range_frames: &[&RangeFrameBounds], + range_frames: &[RangeFrameBounds], left_key: &StateKey, right_key: &StateKey, left_offset_fn: impl Fn(&RangeFrameBounds, &Datum) -> Sentinelled, @@ -459,7 +459,7 @@ fn calc_logical_ord_for_range_frames( } fn find_for_range_frames<'cache, const LEFT: bool>( - range_frames: &[&RangeFrameBounds], + range_frames: &[RangeFrameBounds], part_with_delta: DeltaBTreeMap<'cache, CacheKey, OwnedRow>, logical_order_value: impl ToDatumRef, cache_key_pk_len: usize, @@ -1204,13 +1204,13 @@ mod tests { let order_type = OrderType::ascending(); let range_frames = [ - &create_range_frame( + create_range_frame( order_data_type.clone(), order_type, Preceding(3i64), Preceding(2i64), ), - &create_range_frame( + create_range_frame( order_data_type.clone(), order_type, Preceding(1i64), @@ -1252,7 +1252,7 @@ mod tests { let order_data_type = DataType::Timestamp; let order_type = OrderType::descending_nulls_first(); - let range_frames = [&create_range_frame( + let range_frames = [create_range_frame( order_data_type.clone(), order_type, Preceding(Interval::from_month_day_usec(1, 2, 3 * 1000 * 1000)), @@ -1320,7 +1320,7 @@ mod tests { expected_left: Sentinelled, expected_right: Sentinelled, ) { - let frames = if matches!(order_data_type, DataType::Int32) { + let range_frames = if matches!(order_data_type, DataType::Int32) { [create_range_frame( order_data_type.clone(), order_type, @@ -1330,7 +1330,6 @@ mod tests { } else { panic!() }; - let range_frames = frames.iter().collect::>(); let logical_order_value = Some(logical_order_value); let cache_key_pk_len = 1; diff --git a/src/stream/src/executor/over_window/general.rs b/src/stream/src/executor/over_window/general.rs index d97ea51b5360d..be71950694a04 100644 --- a/src/stream/src/executor/over_window/general.rs +++ b/src/stream/src/executor/over_window/general.rs @@ -24,8 +24,11 @@ use risingwave_common::session_config::OverWindowCachePolicy as CachePolicy; use risingwave_common::types::DefaultOrdered; use risingwave_common::util::memcmp_encoding::{self, MemcmpEncoded}; use risingwave_common::util::sort_util::OrderType; -use risingwave_expr::window_function::{StateKey, WindowFuncCall}; +use risingwave_expr::window_function::{ + RangeFrameBounds, RowsFrameBounds, StateKey, WindowFuncCall, +}; +use super::frame_finder::merge_rows_frames; use super::over_partition::{ new_empty_partition_cache, shrink_partition_cache, CacheKey, OverPartition, PartitionCache, PartitionDelta, @@ -50,7 +53,7 @@ struct ExecutorInner { actor_ctx: ActorContextRef, schema: Schema, - calls: Vec, + calls: Calls, deduped_part_key_indices: Vec, order_key_indices: Vec, order_key_data_types: Vec, @@ -134,15 +137,66 @@ pub struct OverWindowExecutorArgs { pub cache_policy: CachePolicy, } +/// Information about the window function calls. +/// Contains the original calls and many other information that can be derived from the calls to avoid +/// repeated calculation. +pub(super) struct Calls { + calls: Vec, + + /// The `ROWS` frame that is the union of all `ROWS` frames. + pub(super) super_rows_frame_bounds: RowsFrameBounds, + /// All `RANGE` frames. + pub(super) range_frames: Vec, + pub(super) start_is_unbounded: bool, + pub(super) end_is_unbounded: bool, +} + +impl Calls { + fn new(calls: Vec) -> Self { + let rows_frames = calls + .iter() + .filter_map(|call| call.frame.bounds.as_rows()) + .collect::>(); + let super_rows_frame_bounds = merge_rows_frames(&rows_frames); + let range_frames = calls + .iter() + .filter_map(|call| call.frame.bounds.as_range()) + .cloned() + .collect::>(); + + let start_is_unbounded = calls + .iter() + .any(|call| call.frame.bounds.start_is_unbounded()); + let end_is_unbounded = calls + .iter() + .any(|call| call.frame.bounds.end_is_unbounded()); + + Self { + calls, + super_rows_frame_bounds, + range_frames, + start_is_unbounded, + end_is_unbounded, + } + } + + pub(super) fn iter(&self) -> impl ExactSizeIterator { + self.calls.iter() + } + + pub(super) fn len(&self) -> usize { + self.calls.len() + } +} + impl OverWindowExecutor { pub fn new(args: OverWindowExecutorArgs) -> Self { + let calls = Calls::new(args.calls); + let input_info = args.input.info().clone(); let input_schema = &input_info.schema; - let has_unbounded_frame = args - .calls - .iter() - .any(|call| call.frame.bounds.is_unbounded()); + let has_unbounded_frame = calls.start_is_unbounded || calls.end_is_unbounded; let cache_policy = if has_unbounded_frame { // For unbounded frames, we finally need all entries of the partition in the cache, // so for simplicity we just use full cache policy for these cases. @@ -177,7 +231,7 @@ impl OverWindowExecutor { inner: ExecutorInner { actor_ctx: args.actor_ctx, schema: args.schema, - calls: args.calls, + calls, deduped_part_key_indices, order_key_indices: args.order_key_indices, order_key_data_types, @@ -353,9 +407,8 @@ impl OverWindowExecutor { ); // Build changes for current partition. - let (part_changes, accessed_range) = partition - .build_changes(&this.state_table, &this.calls, delta) - .await?; + let (part_changes, accessed_range) = + partition.build_changes(&this.state_table, delta).await?; for (key, record) in part_changes { // Build chunk and yield if needed. diff --git a/src/stream/src/executor/over_window/over_partition.rs b/src/stream/src/executor/over_window/over_partition.rs index 2ce9ecad38763..8406d094d7ff7 100644 --- a/src/stream/src/executor/over_window/over_partition.rs +++ b/src/stream/src/executor/over_window/over_partition.rs @@ -28,14 +28,12 @@ use risingwave_common::session_config::OverWindowCachePolicy as CachePolicy; use risingwave_common::types::{Datum, Sentinelled}; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common_estimate_size::collections::EstimatedBTreeMap; -use risingwave_expr::window_function::{ - create_window_state, RangeFrameBounds, RowsFrameBounds, StateKey, WindowFuncCall, WindowStates, -}; +use risingwave_expr::window_function::{create_window_state, StateKey, WindowStates}; use risingwave_storage::store::PrefetchOptions; use risingwave_storage::StateStore; use static_assertions::const_assert; -use super::general::RowConverter; +use super::general::{Calls, RowConverter}; use crate::common::table::state_table::StateTable; use crate::consistency::{consistency_error, enable_strict_consistency}; use crate::executor::over_window::frame_finder::*; @@ -294,12 +292,7 @@ pub(super) struct OverPartition<'a, S: StateStore> { range_cache: &'a mut PartitionCache, cache_policy: CachePolicy, - /// The `ROWS` frame that is the union of all `ROWS` frames of all window functions in this - /// over window executor. - super_rows_frame_bounds: RowsFrameBounds, - range_frames: Vec<&'a RangeFrameBounds>, - start_is_unbounded: bool, - end_is_unbounded: bool, + calls: &'a Calls, row_conv: RowConverter<'a>, stats: OverPartitionStats, @@ -315,36 +308,15 @@ impl<'a, S: StateStore> OverPartition<'a, S> { deduped_part_key: &'a OwnedRow, cache: &'a mut PartitionCache, cache_policy: CachePolicy, - calls: &'a [WindowFuncCall], + calls: &'a Calls, row_conv: RowConverter<'a>, ) -> Self { - let rows_frames = calls - .iter() - .filter_map(|call| call.frame.bounds.as_rows()) - .collect::>(); - // TODO(rc): maybe should avoid repeated merging - let super_rows_frame_bounds = merge_rows_frames(&rows_frames); - let range_frames = calls - .iter() - .filter_map(|call| call.frame.bounds.as_range()) - .collect::>(); - - let start_is_unbounded = calls - .iter() - .any(|call| call.frame.bounds.start_is_unbounded()); - let end_is_unbounded = calls - .iter() - .any(|call| call.frame.bounds.end_is_unbounded()); - Self { deduped_part_key, range_cache: cache, cache_policy, - super_rows_frame_bounds, - range_frames, - start_is_unbounded, - end_is_unbounded, + calls, row_conv, stats: Default::default(), @@ -418,13 +390,14 @@ impl<'a, S: StateStore> OverPartition<'a, S> { pub async fn build_changes( &mut self, table: &StateTable, - calls: &[WindowFuncCall], mut delta: PartitionDelta, ) -> StreamExecutorResult<( BTreeMap>, Option>, )> { - let input_schema_len = table.get_data_types().len() - calls.len(); + let input_schema_len = table.get_data_types().len() - self.calls.len(); + let calls = self.calls; + let mut part_changes = BTreeMap::new(); let mut accessed_entry_count = 0; let mut compute_count = 0; @@ -633,7 +606,7 @@ impl<'a, S: StateStore> OverPartition<'a, S> { let delta_last = delta.last_key_value().unwrap().0.as_normal_expect(); let range_frame_logical_curr = - calc_logical_curr_for_range_frames(&self.range_frames, delta_first, delta_last); + calc_logical_curr_for_range_frames(&self.calls.range_frames, delta_first, delta_last); loop { // TERMINATEABILITY: `extend_cache_leftward_by_n` and `extend_cache_rightward_by_n` keep @@ -688,7 +661,7 @@ impl<'a, S: StateStore> OverPartition<'a, S> { // ensure everything is in the cache self.extend_cache_to_boundary(table).await?; } else { - // TODO(rc): later we should extend cache using `self.super_rows_frame_bounds` and + // TODO(rc): later we should extend cache using `self.calls.super_rows_frame_bounds` and // `range_frame_logical_curr` as hints. // ensure the cache covers all delta (if possible) @@ -754,13 +727,13 @@ impl<'a, S: StateStore> OverPartition<'a, S> { let first_key = part_with_delta.first_key().unwrap(); let last_key = part_with_delta.last_key().unwrap(); - let first_curr_key = if self.end_is_unbounded || delta_first_key == first_key { + let first_curr_key = if self.calls.end_is_unbounded || delta_first_key == first_key { // If the frame end is unbounded, or, the first key is in delta, then the frame corresponding // to the first key is always affected. first_key } else { let mut key = find_first_curr_for_rows_frame( - &self.super_rows_frame_bounds, + &self.calls.super_rows_frame_bounds, part_with_delta, delta_first_key, ); @@ -768,7 +741,7 @@ impl<'a, S: StateStore> OverPartition<'a, S> { if let Some((logical_first_curr, _)) = range_frame_logical_curr { let logical_curr = logical_first_curr.as_normal_expect(); // otherwise should go `end_is_unbounded` branch let new_key = find_left_for_range_frames( - &self.range_frames, + &self.calls.range_frames, part_with_delta, logical_curr, cache_key_pk_len, @@ -779,12 +752,12 @@ impl<'a, S: StateStore> OverPartition<'a, S> { key }; - let last_curr_key = if self.start_is_unbounded || delta_last_key == last_key { + let last_curr_key = if self.calls.start_is_unbounded || delta_last_key == last_key { // similar to `first_curr_key` last_key } else { let mut key = find_last_curr_for_rows_frame( - &self.super_rows_frame_bounds, + &self.calls.super_rows_frame_bounds, part_with_delta, delta_last_key, ); @@ -792,7 +765,7 @@ impl<'a, S: StateStore> OverPartition<'a, S> { if let Some((_, logical_last_curr)) = range_frame_logical_curr { let logical_curr = logical_last_curr.as_normal_expect(); // otherwise should go `start_is_unbounded` branch let new_key = find_right_for_range_frames( - &self.range_frames, + &self.calls.range_frames, part_with_delta, logical_curr, cache_key_pk_len, @@ -833,18 +806,18 @@ impl<'a, S: StateStore> OverPartition<'a, S> { } let range_frame_logical_boundary = calc_logical_boundary_for_range_frames( - &self.range_frames, + &self.calls.range_frames, first_curr_key.as_normal_expect(), last_curr_key.as_normal_expect(), ); - let first_frame_start = if self.start_is_unbounded || first_curr_key == first_key { + let first_frame_start = if self.calls.start_is_unbounded || first_curr_key == first_key { // If the frame start is unbounded, or, the first curr key is the first key, then the first key // always need to be included in the affected range. first_key } else { let mut key = find_frame_start_for_rows_frame( - &self.super_rows_frame_bounds, + &self.calls.super_rows_frame_bounds, part_with_delta, first_curr_key, ); @@ -852,7 +825,7 @@ impl<'a, S: StateStore> OverPartition<'a, S> { if let Some((logical_first_start, _)) = range_frame_logical_boundary.as_ref() { let logical_boundary = logical_first_start.as_normal_expect(); // otherwise should go `end_is_unbounded` branch let new_key = find_left_for_range_frames( - &self.range_frames, + &self.calls.range_frames, part_with_delta, logical_boundary, cache_key_pk_len, @@ -864,12 +837,12 @@ impl<'a, S: StateStore> OverPartition<'a, S> { }; assert!(first_frame_start <= first_curr_key); - let last_frame_end = if self.end_is_unbounded || last_curr_key == last_key { + let last_frame_end = if self.calls.end_is_unbounded || last_curr_key == last_key { // similar to `first_frame_start` last_key } else { let mut key = find_frame_end_for_rows_frame( - &self.super_rows_frame_bounds, + &self.calls.super_rows_frame_bounds, part_with_delta, last_curr_key, ); @@ -877,7 +850,7 @@ impl<'a, S: StateStore> OverPartition<'a, S> { if let Some((_, logical_last_end)) = range_frame_logical_boundary.as_ref() { let logical_boundary = logical_last_end.as_normal_expect(); // otherwise should go `end_is_unbounded` branch let new_key = find_right_for_range_frames( - &self.range_frames, + &self.calls.range_frames, part_with_delta, logical_boundary, cache_key_pk_len,