Skip to content

Commit

Permalink
fix btree_cursor for over_window
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Feb 3, 2024
1 parent 2da642d commit 4b1af8a
Showing 1 changed file with 19 additions and 19 deletions.
38 changes: 19 additions & 19 deletions src/stream/src/executor/over_window/over_partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,25 +88,25 @@ pub(super) fn shrink_partition_cache(

let mut cursor = range_cache.inner().upper_bound(Bound::Excluded(&ck_start));
for _ in 0..MAGIC_JITTER_PREVENTION {
if cursor.key().is_none() {
if cursor.prev().is_none() {
break;
}
cursor.move_prev();
}
let start = cursor
.key()
.peek_prev()
.map(|(k, _)| k)
.unwrap_or_else(|| range_cache.first_key_value().unwrap().0)
.clone();

let mut cursor = range_cache.inner().lower_bound(Bound::Excluded(&ck_end));
let mut cursor = range_cache.inner().lower_bound(Bound::Included(&ck_start));
for _ in 0..MAGIC_JITTER_PREVENTION {
if cursor.key().is_none() {
if cursor.next().is_none() {
break;
}
cursor.move_next();
}
let end = cursor
.key()
.peek_next()
.map(|(k, _)| k)
.unwrap_or_else(|| range_cache.last_key_value().unwrap().0)
.clone();

Expand All @@ -125,27 +125,27 @@ pub(super) fn shrink_partition_cache(
let mut cursor = range_cache.inner().upper_bound(Bound::Excluded(&ck_start));
// go back for at most `MAGIC_JITTER_PREVENTION` entries
for _ in 0..MAGIC_JITTER_PREVENTION {
if cursor.key().is_none() {
if cursor.prev().is_none() {
break;
}
cursor.move_prev();
capacity_remain -= 1;
}
let start = cursor
.key()
.peek_prev()
.map(|(k, _)| k)
.unwrap_or_else(|| range_cache.first_key_value().unwrap().0)
.clone();

let mut cursor = range_cache.inner().lower_bound(Bound::Included(&ck_start));
// go forward for at most `capacity_remain` entries
for _ in 0..capacity_remain {
if cursor.key().is_none() {
if cursor.next().is_none() {
break;
}
cursor.move_next();
}
let end = cursor
.key()
.peek_next()
.map(|(k, _)| k)
.unwrap_or_else(|| range_cache.last_key_value().unwrap().0)
.clone();

Expand All @@ -165,27 +165,27 @@ pub(super) fn shrink_partition_cache(
let mut cursor = range_cache.inner().lower_bound(Bound::Excluded(&ck_end));
// go forward for at most `MAGIC_JITTER_PREVENTION` entries
for _ in 0..MAGIC_JITTER_PREVENTION {
if cursor.key().is_none() {
if cursor.next().is_none() {
break;
}
cursor.move_next();
capacity_remain -= 1;
}
let end = cursor
.key()
.peek_next()
.map(|(k, _)| k)
.unwrap_or_else(|| range_cache.last_key_value().unwrap().0)
.clone();

let mut cursor = range_cache.inner().upper_bound(Bound::Included(&ck_end));
// go back for at most `capacity_remain` entries
for _ in 0..capacity_remain {
if cursor.key().is_none() {
if cursor.prev().is_none() {
break;
}
cursor.move_prev();
}
let start = cursor
.key()
.peek_prev()
.map(|(k, _)| k)
.unwrap_or_else(|| range_cache.first_key_value().unwrap().0)
.clone();

Expand Down

0 comments on commit 4b1af8a

Please sign in to comment.