Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed Jan 9, 2024
1 parent a6883e3 commit cb73122
Showing 1 changed file with 12 additions and 36 deletions.
48 changes: 12 additions & 36 deletions src/stream/src/executor/over_window/over_partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -834,7 +834,7 @@ fn find_affected_ranges<'cache>(
} else {
let mut min_first_curr_key = &Sentinelled::Largest;

'outer_loop: for call in calls {
for call in calls {
let key = match &call.frame.bounds {
FrameBounds::Rows(bounds) => {
let mut cursor = part_with_delta
Expand All @@ -844,8 +844,7 @@ fn find_affected_ranges<'cache>(
// cursor is at ghost position at first.
cursor.move_prev();
if cursor.position().is_ghost() {
min_first_curr_key = first_key;
break 'outer_loop;
break;
}
}
cursor.key().unwrap_or(first_key)
Expand All @@ -854,15 +853,10 @@ fn find_affected_ranges<'cache>(
min_first_curr_key = min_first_curr_key.min(key);
if min_first_curr_key == first_key {
// if we already pushed the affected curr key to the first key, no more pushing is needed
break 'outer_loop;
break;
}
}

assert!(
!min_first_curr_key.is_largest(),
"# of window function calls > 0, meaning this must be mutated"
);

min_first_curr_key
};

Expand All @@ -873,15 +867,14 @@ fn find_affected_ranges<'cache>(
} else {
let mut min_frame_start = &Sentinelled::Largest;

'outer_loop: for call in calls {
for call in calls {
let key = match &call.frame.bounds {
FrameBounds::Rows(bounds) => {
let mut cursor = part_with_delta.find(first_curr_key).unwrap();
for _ in 0..bounds.start.n_preceding_rows().unwrap() {
cursor.move_prev();
if cursor.position().is_ghost() {
min_frame_start = first_key;
break 'outer_loop;
break;
}
}
cursor.key().unwrap_or(first_key)
Expand All @@ -890,15 +883,10 @@ fn find_affected_ranges<'cache>(
min_frame_start = min_frame_start.min(key);
if min_frame_start == first_key {
// if we already pushed the affected frame start to the first key, no more pushing is needed
break 'outer_loop;
break;
}
}

assert!(
!min_frame_start.is_largest(),
"# of window function calls > 0, meaning this must be mutated"
);

min_frame_start
};

Expand All @@ -907,16 +895,15 @@ fn find_affected_ranges<'cache>(
} else {
let mut max_last_curr_key = &Sentinelled::Smallest;

'outer_loop: for call in calls {
for call in calls {
let key = match &call.frame.bounds {
FrameBounds::Rows(bounds) => {
let mut cursor = part_with_delta
.upper_bound(Bound::Included(delta.last_key_value().unwrap().0));
for _ in 0..bounds.start.n_preceding_rows().unwrap() {
cursor.move_next();
if cursor.position().is_ghost() {
max_last_curr_key = last_key;
break 'outer_loop;
break;
}
}
cursor.key().unwrap_or(last_key)
Expand All @@ -925,15 +912,10 @@ fn find_affected_ranges<'cache>(
max_last_curr_key = max_last_curr_key.max(key);
if max_last_curr_key == last_key {
// if we already pushed the affected curr key to the last key, no more pushing is needed
break 'outer_loop;
break;
}
}

assert!(
!max_last_curr_key.is_smallest(),
"# of window function calls > 0, meaning this must be mutated"
);

max_last_curr_key
};

Expand All @@ -942,15 +924,14 @@ fn find_affected_ranges<'cache>(
} else {
let mut max_frame_end = &Sentinelled::Smallest;

'outer_loop: for call in calls {
for call in calls {
let key = match &call.frame.bounds {
FrameBounds::Rows(bounds) => {
let mut cursor = part_with_delta.find(last_curr_key).unwrap();
for _ in 0..bounds.end.n_following_rows().unwrap() {
cursor.move_next();
if cursor.position().is_ghost() {
max_frame_end = last_key;
break 'outer_loop;
break;
}
}
cursor.key().unwrap_or(last_key)
Expand All @@ -959,15 +940,10 @@ fn find_affected_ranges<'cache>(
max_frame_end = max_frame_end.max(key);
if max_frame_end == last_key {
// if we already pushed the affected frame end to the last key, no more pushing is needed
break 'outer_loop;
break;
}
}

assert!(
!max_frame_end.is_smallest(),
"# of window function calls > 0, meaning this must be mutated"
);

max_frame_end
};

Expand Down

0 comments on commit cb73122

Please sign in to comment.