Skip to content

Commit

Permalink
short-circuit in find_affected_ranges
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed Jan 9, 2024
1 parent 6a70a2e commit a6883e3
Showing 1 changed file with 85 additions and 28 deletions.
113 changes: 85 additions & 28 deletions src/stream/src/executor/over_window/over_partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -822,14 +822,20 @@ fn find_affected_ranges<'cache>(
.iter()
.any(|call| call.frame.bounds.end_is_unbounded());

// NOTE: Don't be too clever! Here we must calculate `first_frame_start` after calculating
// `first_curr_key`, because the correct calculation of `first_frame_start` depends on
// `first_curr_key` which is the MINIMUM of all `first_curr_key`s of all frames of all window
// function calls.

let first_curr_key = if end_is_unbounded {
// If the frame end is unbounded, the frame corresponding to the first key is always
// affected.
first_key
} else {
calls
.iter()
.map(|call| match &call.frame.bounds {
let mut min_first_curr_key = &Sentinelled::Largest;

'outer_loop: for call in calls {
let key = match &call.frame.bounds {
FrameBounds::Rows(bounds) => {
let mut cursor = part_with_delta
.lower_bound(Bound::Included(delta.first_key_value().unwrap().0));
Expand All @@ -838,80 +844,131 @@ fn find_affected_ranges<'cache>(
// cursor is at ghost position at first.
cursor.move_prev();
if cursor.position().is_ghost() {
break;
min_first_curr_key = first_key;
break 'outer_loop;
}
}
cursor.key().unwrap_or(first_key)
}
})
.min()
.expect("# of window function calls > 0")
};
min_first_curr_key = min_first_curr_key.min(key);
if min_first_curr_key == first_key {
// if we already pushed the affected curr key to the first key, no more pushing is needed
break 'outer_loop;
}
}

assert!(
!min_first_curr_key.is_largest(),
"# of window function calls > 0, meaning this must be mutated"
);

min_first_curr_key
};

let first_frame_start = if start_is_unbounded {
// If the frame start is unbounded, the first key always need to be included in the affected
// range.
first_key
} else {
calls
.iter()
.map(|call| match &call.frame.bounds {
let mut min_frame_start = &Sentinelled::Largest;

'outer_loop: for call in calls {
let key = match &call.frame.bounds {
FrameBounds::Rows(bounds) => {
let mut cursor = part_with_delta.find(first_curr_key).unwrap();
for _ in 0..bounds.start.n_preceding_rows().unwrap() {
cursor.move_prev();
if cursor.position().is_ghost() {
break;
min_frame_start = first_key;
break 'outer_loop;
}
}
cursor.key().unwrap_or(first_key)
}
})
.min()
.expect("# of window function calls > 0")
};
min_frame_start = min_frame_start.min(key);
if min_frame_start == first_key {
// if we already pushed the affected frame start to the first key, no more pushing is needed
break 'outer_loop;
}
}

assert!(
!min_frame_start.is_largest(),
"# of window function calls > 0, meaning this must be mutated"
);

min_frame_start
};

let last_curr_key = if start_is_unbounded {
last_key
} else {
calls
.iter()
.map(|call| match &call.frame.bounds {
let mut max_last_curr_key = &Sentinelled::Smallest;

'outer_loop: for call in calls {
let key = match &call.frame.bounds {
FrameBounds::Rows(bounds) => {
let mut cursor = part_with_delta
.upper_bound(Bound::Included(delta.last_key_value().unwrap().0));
for _ in 0..bounds.start.n_preceding_rows().unwrap() {
cursor.move_next();
if cursor.position().is_ghost() {
break;
max_last_curr_key = last_key;
break 'outer_loop;
}
}
cursor.key().unwrap_or(last_key)
}
})
.max()
.expect("# of window function calls > 0")
};
max_last_curr_key = max_last_curr_key.max(key);
if max_last_curr_key == last_key {
// if we already pushed the affected curr key to the last key, no more pushing is needed
break 'outer_loop;
}
}

assert!(
!max_last_curr_key.is_smallest(),
"# of window function calls > 0, meaning this must be mutated"
);

max_last_curr_key
};

let last_frame_end = if end_is_unbounded {
last_key
} else {
calls
.iter()
.map(|call| match &call.frame.bounds {
let mut max_frame_end = &Sentinelled::Smallest;

'outer_loop: for call in calls {
let key = match &call.frame.bounds {
FrameBounds::Rows(bounds) => {
let mut cursor = part_with_delta.find(last_curr_key).unwrap();
for _ in 0..bounds.end.n_following_rows().unwrap() {
cursor.move_next();
if cursor.position().is_ghost() {
break;
max_frame_end = last_key;
break 'outer_loop;
}
}
cursor.key().unwrap_or(last_key)
}
})
.max()
.expect("# of window function calls > 0")
};
max_frame_end = max_frame_end.max(key);
if max_frame_end == last_key {
// if we already pushed the affected frame end to the last key, no more pushing is needed
break 'outer_loop;
}
}

assert!(
!max_frame_end.is_smallest(),
"# of window function calls > 0, meaning this must be mutated"
);

max_frame_end
};

if first_curr_key > last_curr_key {
Expand Down

0 comments on commit a6883e3

Please sign in to comment.