Skip to content

Commit

Permalink
perf(over window): avoid recompute information about window frames (#…
Browse files Browse the repository at this point in the history
…19036)

Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc authored Oct 28, 2024
1 parent 0d5ffe7 commit 16107fd
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 71 deletions.
21 changes: 10 additions & 11 deletions src/stream/src/executor/over_window/frame_finder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ pub(super) fn find_frame_end_for_rows_frame<'cache>(
/// Given the first and last key in delta, calculate the order values of the first
/// and the last frames logically affected by some `RANGE` frames.
pub(super) fn calc_logical_curr_for_range_frames(
range_frames: &[&RangeFrameBounds],
range_frames: &[RangeFrameBounds],
delta_first_key: &StateKey,
delta_last_key: &StateKey,
) -> Option<(Sentinelled<Datum>, Sentinelled<Datum>)> {
Expand All @@ -167,7 +167,7 @@ pub(super) fn calc_logical_curr_for_range_frames(
/// values of the logical start row of the first frame and the logical end row of the
/// last frame.
pub(super) fn calc_logical_boundary_for_range_frames(
range_frames: &[&RangeFrameBounds],
range_frames: &[RangeFrameBounds],
first_curr_key: &StateKey,
last_curr_key: &StateKey,
) -> Option<(Sentinelled<Datum>, Sentinelled<Datum>)> {
Expand All @@ -184,7 +184,7 @@ pub(super) fn calc_logical_boundary_for_range_frames(
/// find the most closed cache key in `part_with_delta`. Ideally this function returns
/// the smallest key that is larger than or equal to the given logical order (using `lower_bound`).
pub(super) fn find_left_for_range_frames<'cache>(
range_frames: &[&RangeFrameBounds],
range_frames: &[RangeFrameBounds],
part_with_delta: DeltaBTreeMap<'cache, CacheKey, OwnedRow>,
logical_order_value: impl ToDatumRef,
cache_key_pk_len: usize, // this is dirty but we have no better choice
Expand All @@ -201,7 +201,7 @@ pub(super) fn find_left_for_range_frames<'cache>(
/// find the most closed cache key in `part_with_delta`. Ideally this function returns
/// the largest key that is smaller than or equal to the given logical order (using `lower_bound`).
pub(super) fn find_right_for_range_frames<'cache>(
range_frames: &[&RangeFrameBounds],
range_frames: &[RangeFrameBounds],
part_with_delta: DeltaBTreeMap<'cache, CacheKey, OwnedRow>,
logical_order_value: impl ToDatumRef,
cache_key_pk_len: usize, // this is dirty but we have no better choice
Expand Down Expand Up @@ -391,7 +391,7 @@ fn find_boundary_for_rows_frame<'cache, const LEFT: bool>(
/// repeating. Check [`calc_logical_curr_for_range_frames`] and [`calc_logical_boundary_for_range_frames`]
/// if you cannot understand the purpose of this function.
fn calc_logical_ord_for_range_frames(
range_frames: &[&RangeFrameBounds],
range_frames: &[RangeFrameBounds],
left_key: &StateKey,
right_key: &StateKey,
left_offset_fn: impl Fn(&RangeFrameBounds, &Datum) -> Sentinelled<Datum>,
Expand Down Expand Up @@ -459,7 +459,7 @@ fn calc_logical_ord_for_range_frames(
}

fn find_for_range_frames<'cache, const LEFT: bool>(
range_frames: &[&RangeFrameBounds],
range_frames: &[RangeFrameBounds],
part_with_delta: DeltaBTreeMap<'cache, CacheKey, OwnedRow>,
logical_order_value: impl ToDatumRef,
cache_key_pk_len: usize,
Expand Down Expand Up @@ -1204,13 +1204,13 @@ mod tests {
let order_type = OrderType::ascending();

let range_frames = [
&create_range_frame(
create_range_frame(
order_data_type.clone(),
order_type,
Preceding(3i64),
Preceding(2i64),
),
&create_range_frame(
create_range_frame(
order_data_type.clone(),
order_type,
Preceding(1i64),
Expand Down Expand Up @@ -1252,7 +1252,7 @@ mod tests {
let order_data_type = DataType::Timestamp;
let order_type = OrderType::descending_nulls_first();

let range_frames = [&create_range_frame(
let range_frames = [create_range_frame(
order_data_type.clone(),
order_type,
Preceding(Interval::from_month_day_usec(1, 2, 3 * 1000 * 1000)),
Expand Down Expand Up @@ -1320,7 +1320,7 @@ mod tests {
expected_left: Sentinelled<ScalarImpl>,
expected_right: Sentinelled<ScalarImpl>,
) {
let frames = if matches!(order_data_type, DataType::Int32) {
let range_frames = if matches!(order_data_type, DataType::Int32) {
[create_range_frame(
order_data_type.clone(),
order_type,
Expand All @@ -1330,7 +1330,6 @@ mod tests {
} else {
panic!()
};
let range_frames = frames.iter().collect::<Vec<_>>();
let logical_order_value = Some(logical_order_value);
let cache_key_pk_len = 1;

Expand Down
73 changes: 63 additions & 10 deletions src/stream/src/executor/over_window/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ use risingwave_common::session_config::OverWindowCachePolicy as CachePolicy;
use risingwave_common::types::DefaultOrdered;
use risingwave_common::util::memcmp_encoding::{self, MemcmpEncoded};
use risingwave_common::util::sort_util::OrderType;
use risingwave_expr::window_function::{StateKey, WindowFuncCall};
use risingwave_expr::window_function::{
RangeFrameBounds, RowsFrameBounds, StateKey, WindowFuncCall,
};

use super::frame_finder::merge_rows_frames;
use super::over_partition::{
new_empty_partition_cache, shrink_partition_cache, CacheKey, OverPartition, PartitionCache,
PartitionDelta,
Expand All @@ -50,7 +53,7 @@ struct ExecutorInner<S: StateStore> {
actor_ctx: ActorContextRef,

schema: Schema,
calls: Vec<WindowFuncCall>,
calls: Calls,
deduped_part_key_indices: Vec<usize>,
order_key_indices: Vec<usize>,
order_key_data_types: Vec<DataType>,
Expand Down Expand Up @@ -134,15 +137,66 @@ pub struct OverWindowExecutorArgs<S: StateStore> {
pub cache_policy: CachePolicy,
}

/// Information about the window function calls.
/// Contains the original calls and many other information that can be derived from the calls to avoid
/// repeated calculation.
pub(super) struct Calls {
calls: Vec<WindowFuncCall>,

/// The `ROWS` frame that is the union of all `ROWS` frames.
pub(super) super_rows_frame_bounds: RowsFrameBounds,
/// All `RANGE` frames.
pub(super) range_frames: Vec<RangeFrameBounds>,
pub(super) start_is_unbounded: bool,
pub(super) end_is_unbounded: bool,
}

impl Calls {
fn new(calls: Vec<WindowFuncCall>) -> Self {
let rows_frames = calls
.iter()
.filter_map(|call| call.frame.bounds.as_rows())
.collect::<Vec<_>>();
let super_rows_frame_bounds = merge_rows_frames(&rows_frames);
let range_frames = calls
.iter()
.filter_map(|call| call.frame.bounds.as_range())
.cloned()
.collect::<Vec<_>>();

let start_is_unbounded = calls
.iter()
.any(|call| call.frame.bounds.start_is_unbounded());
let end_is_unbounded = calls
.iter()
.any(|call| call.frame.bounds.end_is_unbounded());

Self {
calls,
super_rows_frame_bounds,
range_frames,
start_is_unbounded,
end_is_unbounded,
}
}

pub(super) fn iter(&self) -> impl ExactSizeIterator<Item = &WindowFuncCall> {
self.calls.iter()
}

pub(super) fn len(&self) -> usize {
self.calls.len()
}
}

impl<S: StateStore> OverWindowExecutor<S> {
pub fn new(args: OverWindowExecutorArgs<S>) -> Self {
let calls = Calls::new(args.calls);

let input_info = args.input.info().clone();
let input_schema = &input_info.schema;

let has_unbounded_frame = args
.calls
.iter()
.any(|call| call.frame.bounds.is_unbounded());
let has_unbounded_frame = calls.start_is_unbounded || calls.end_is_unbounded;
let cache_policy = if has_unbounded_frame {
// For unbounded frames, we finally need all entries of the partition in the cache,
// so for simplicity we just use full cache policy for these cases.
Expand Down Expand Up @@ -177,7 +231,7 @@ impl<S: StateStore> OverWindowExecutor<S> {
inner: ExecutorInner {
actor_ctx: args.actor_ctx,
schema: args.schema,
calls: args.calls,
calls,
deduped_part_key_indices,
order_key_indices: args.order_key_indices,
order_key_data_types,
Expand Down Expand Up @@ -353,9 +407,8 @@ impl<S: StateStore> OverWindowExecutor<S> {
);

// Build changes for current partition.
let (part_changes, accessed_range) = partition
.build_changes(&this.state_table, &this.calls, delta)
.await?;
let (part_changes, accessed_range) =
partition.build_changes(&this.state_table, delta).await?;

for (key, record) in part_changes {
// Build chunk and yield if needed.
Expand Down
Loading

0 comments on commit 16107fd

Please sign in to comment.