diff --git a/src/stream/src/executor/over_window/over_partition.rs b/src/stream/src/executor/over_window/over_partition.rs index e6d095e91e5c0..baac154593f7a 100644 --- a/src/stream/src/executor/over_window/over_partition.rs +++ b/src/stream/src/executor/over_window/over_partition.rs @@ -32,6 +32,7 @@ use risingwave_expr::window_function::{ }; use risingwave_storage::store::PrefetchOptions; use risingwave_storage::StateStore; +use static_assertions::const_assert; use super::general::RowConverter; use crate::common::table::state_table::StateTable; @@ -88,9 +89,11 @@ pub(super) fn shrink_partition_cache( let (sk_start, sk_end) = recently_accessed_range.into_inner(); let (ck_start, ck_end) = (CacheKey::from(sk_start), CacheKey::from(sk_end)); + // find the cursor just before `ck_start` let mut cursor = range_cache.inner().upper_bound(Bound::Excluded(&ck_start)); for _ in 0..MAGIC_JITTER_PREVENTION { if cursor.prev().is_none() { + // already at the beginning break; } } @@ -100,9 +103,11 @@ pub(super) fn shrink_partition_cache( .unwrap_or_else(|| range_cache.first_key_value().unwrap().0) .clone(); - let mut cursor = range_cache.inner().lower_bound(Bound::Included(&ck_start)); + // find the cursor just after `ck_end` + let mut cursor = range_cache.inner().lower_bound(Bound::Excluded(&ck_end)); for _ in 0..MAGIC_JITTER_PREVENTION { if cursor.next().is_none() { + // already at the end break; } } @@ -122,12 +127,18 @@ pub(super) fn shrink_partition_cache( let (sk_start, _sk_end) = recently_accessed_range.into_inner(); let ck_start = CacheKey::from(sk_start); - let mut capacity_remain = MAGIC_CACHE_SIZE; // precision is not important here, code simplicity is first + let mut capacity_remain = MAGIC_CACHE_SIZE; // precision is not important here, code simplicity is the first + const_assert!(MAGIC_JITTER_PREVENTION < MAGIC_CACHE_SIZE); - let mut cursor = range_cache.inner().upper_bound(Bound::Excluded(&ck_start)); + // find the cursor just before `ck_start` + let cursor_just_before_ck_start = + range_cache.inner().upper_bound(Bound::Excluded(&ck_start)); + + let mut cursor = cursor_just_before_ck_start.clone(); // go back for at most `MAGIC_JITTER_PREVENTION` entries for _ in 0..MAGIC_JITTER_PREVENTION { if cursor.prev().is_none() { + // already at the beginning break; } capacity_remain -= 1; @@ -138,10 +149,11 @@ pub(super) fn shrink_partition_cache( .unwrap_or_else(|| range_cache.first_key_value().unwrap().0) .clone(); - let mut cursor = range_cache.inner().lower_bound(Bound::Included(&ck_start)); + let mut cursor = cursor_just_before_ck_start; // go forward for at most `capacity_remain` entries for _ in 0..capacity_remain { if cursor.next().is_none() { + // already at the end break; } } @@ -162,12 +174,18 @@ pub(super) fn shrink_partition_cache( let (_sk_start, sk_end) = recently_accessed_range.into_inner(); let ck_end = CacheKey::from(sk_end); - let mut capacity_remain = MAGIC_CACHE_SIZE; // precision is not important here, code simplicity is first + let mut capacity_remain = MAGIC_CACHE_SIZE; // precision is not important here, code simplicity is the first + const_assert!(MAGIC_JITTER_PREVENTION < MAGIC_CACHE_SIZE); + + // find the cursor just after `ck_end` + let cursor_just_after_ck_end = + range_cache.inner().lower_bound(Bound::Excluded(&ck_end)); - let mut cursor = range_cache.inner().lower_bound(Bound::Excluded(&ck_end)); + let mut cursor = cursor_just_after_ck_end.clone(); // go forward for at most `MAGIC_JITTER_PREVENTION` entries for _ in 0..MAGIC_JITTER_PREVENTION { if cursor.next().is_none() { + // already at the end break; } capacity_remain -= 1; @@ -178,10 +196,11 @@ pub(super) fn shrink_partition_cache( .unwrap_or_else(|| range_cache.last_key_value().unwrap().0) .clone(); - let mut cursor = range_cache.inner().upper_bound(Bound::Included(&ck_end)); + let mut cursor = cursor_just_after_ck_end; // go back for at most `capacity_remain` entries for _ in 0..capacity_remain { if cursor.prev().is_none() { + // already at the beginning break; } }