Skip to content

Commit

Permalink
fix btree cursor use in over_partition
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed Apr 16, 2024
1 parent 819cbca commit a897e54
Showing 1 changed file with 26 additions and 7 deletions.
33 changes: 26 additions & 7 deletions src/stream/src/executor/over_window/over_partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use risingwave_expr::window_function::{
};
use risingwave_storage::store::PrefetchOptions;
use risingwave_storage::StateStore;
use static_assertions::const_assert;

use super::general::RowConverter;
use crate::common::table::state_table::StateTable;
Expand Down Expand Up @@ -88,9 +89,11 @@ pub(super) fn shrink_partition_cache(
let (sk_start, sk_end) = recently_accessed_range.into_inner();
let (ck_start, ck_end) = (CacheKey::from(sk_start), CacheKey::from(sk_end));

// find the cursor just before `ck_start`
let mut cursor = range_cache.inner().upper_bound(Bound::Excluded(&ck_start));
for _ in 0..MAGIC_JITTER_PREVENTION {
if cursor.prev().is_none() {
// already at the beginning
break;
}
}
Expand All @@ -100,9 +103,11 @@ pub(super) fn shrink_partition_cache(
.unwrap_or_else(|| range_cache.first_key_value().unwrap().0)
.clone();

let mut cursor = range_cache.inner().lower_bound(Bound::Included(&ck_start));
// find the cursor just after `ck_end`
let mut cursor = range_cache.inner().lower_bound(Bound::Excluded(&ck_end));
for _ in 0..MAGIC_JITTER_PREVENTION {
if cursor.next().is_none() {
// already at the end
break;
}
}
Expand All @@ -122,12 +127,18 @@ pub(super) fn shrink_partition_cache(
let (sk_start, _sk_end) = recently_accessed_range.into_inner();
let ck_start = CacheKey::from(sk_start);

let mut capacity_remain = MAGIC_CACHE_SIZE; // precision is not important here, code simplicity is first
let mut capacity_remain = MAGIC_CACHE_SIZE; // precision is not important here, code simplicity is the first
const_assert!(MAGIC_JITTER_PREVENTION < MAGIC_CACHE_SIZE);

let mut cursor = range_cache.inner().upper_bound(Bound::Excluded(&ck_start));
// find the cursor just before `ck_start`
let cursor_just_before_ck_start =
range_cache.inner().upper_bound(Bound::Excluded(&ck_start));

let mut cursor = cursor_just_before_ck_start.clone();
// go back for at most `MAGIC_JITTER_PREVENTION` entries
for _ in 0..MAGIC_JITTER_PREVENTION {
if cursor.prev().is_none() {
// already at the beginning
break;
}
capacity_remain -= 1;
Expand All @@ -138,10 +149,11 @@ pub(super) fn shrink_partition_cache(
.unwrap_or_else(|| range_cache.first_key_value().unwrap().0)
.clone();

let mut cursor = range_cache.inner().lower_bound(Bound::Included(&ck_start));
let mut cursor = cursor_just_before_ck_start;
// go forward for at most `capacity_remain` entries
for _ in 0..capacity_remain {
if cursor.next().is_none() {
// already at the end
break;
}
}
Expand All @@ -162,12 +174,18 @@ pub(super) fn shrink_partition_cache(
let (_sk_start, sk_end) = recently_accessed_range.into_inner();
let ck_end = CacheKey::from(sk_end);

let mut capacity_remain = MAGIC_CACHE_SIZE; // precision is not important here, code simplicity is first
let mut capacity_remain = MAGIC_CACHE_SIZE; // precision is not important here, code simplicity is the first
const_assert!(MAGIC_JITTER_PREVENTION < MAGIC_CACHE_SIZE);

// find the cursor just after `ck_end`
let cursor_just_after_ck_end =
range_cache.inner().lower_bound(Bound::Excluded(&ck_end));

let mut cursor = range_cache.inner().lower_bound(Bound::Excluded(&ck_end));
let mut cursor = cursor_just_after_ck_end.clone();
// go forward for at most `MAGIC_JITTER_PREVENTION` entries
for _ in 0..MAGIC_JITTER_PREVENTION {
if cursor.next().is_none() {
// already at the end
break;
}
capacity_remain -= 1;
Expand All @@ -178,10 +196,11 @@ pub(super) fn shrink_partition_cache(
.unwrap_or_else(|| range_cache.last_key_value().unwrap().0)
.clone();

let mut cursor = range_cache.inner().upper_bound(Bound::Included(&ck_end));
let mut cursor = cursor_just_after_ck_end;
// go back for at most `capacity_remain` entries
for _ in 0..capacity_remain {
if cursor.prev().is_none() {
// already at the beginning
break;
}
}
Expand Down

0 comments on commit a897e54

Please sign in to comment.