diff --git a/src/storage/src/hummock/iterator/forward_user.rs b/src/storage/src/hummock/iterator/forward_user.rs index 1313a8676a8b0..8a68c10d0b515 100644 --- a/src/storage/src/hummock/iterator/forward_user.rs +++ b/src/storage/src/hummock/iterator/forward_user.rs @@ -12,15 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::ops::Bound; use std::ops::Bound::*; use risingwave_common::must_match; use risingwave_common::util::epoch::MAX_SPILL_TIMES; -use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker, KeyPayloadType, UserKey, UserKeyRange}; +use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker, UserKey, UserKeyRange}; use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch}; -use crate::hummock::iterator::{Forward, HummockIterator, UserKeyEndBoundedIterator}; +use crate::hummock::iterator::{Forward, HummockIterator}; use crate::hummock::local_version::pinned_version::PinnedVersion; use crate::hummock::value::HummockValue; use crate::hummock::HummockResult; @@ -29,13 +28,13 @@ use crate::monitor::StoreLocalStatistic; /// [`UserIterator`] can be used by user directly. pub struct UserIterator> { /// Inner table iterator. - iterator: UserKeyEndBoundedIterator, + iterator: I, // Track the last seen full key full_key_tracker: FullKeyTracker, true>, /// Start and end bounds of user key. - start_bound: Bound>, + key_range: UserKeyRange, /// Only reads values if `ts <= self.read_epoch`. read_epoch: HummockEpoch, @@ -47,24 +46,29 @@ pub struct UserIterator> { _version: Option, stats: StoreLocalStatistic, + + /// Whether the iterator is pointing to a valid position + is_current_pos_valid: bool, } +// TODO: decide whether this should also impl `HummockIterator` impl> UserIterator { /// Create [`UserIterator`] with given `read_epoch`. pub(crate) fn new( iterator: I, - (start_bound, end_bound): UserKeyRange, + key_range: UserKeyRange, read_epoch: u64, min_epoch: u64, version: Option, ) -> Self { Self { - iterator: UserKeyEndBoundedIterator::new(iterator, end_bound), - start_bound, + iterator, + key_range, read_epoch, min_epoch, stats: StoreLocalStatistic::default(), _version: version, + is_current_pos_valid: false, full_key_tracker: FullKeyTracker::new(FullKey::default()), } } @@ -83,6 +87,8 @@ impl> UserIterator { /// (may reach to the end and thus not valid) /// - if `Err(_) ` is returned, it means that some error happened. pub async fn next(&mut self) -> HummockResult<()> { + // Reset the valid flag to make sure if error happens, `is_valid` should return false. + self.is_current_pos_valid = false; // Move the iterator to the next step if it is currently potined to a ready entry. self.iterator.next().await?; @@ -99,10 +105,6 @@ impl> UserIterator { /// Note: before call the function you need to ensure that the iterator is valid. pub fn key(&self) -> FullKey<&[u8]> { assert!(self.is_valid()); - debug_assert_eq!( - self.full_key_tracker.latest_full_key.to_ref(), - self.iterator.key() - ); self.full_key_tracker.latest_full_key.to_ref() } @@ -117,10 +119,11 @@ impl> UserIterator { /// Resets the iterating position to the beginning. pub async fn rewind(&mut self) -> HummockResult<()> { // Reset + self.is_current_pos_valid = false; self.full_key_tracker = FullKeyTracker::new(FullKey::default()); // Handle range scan - match &self.start_bound { + match &self.key_range.0 { Included(begin_key) => { let full_key = FullKey { user_key: begin_key.clone(), @@ -140,10 +143,11 @@ impl> UserIterator { /// Resets the iterating position to the first position where the key >= provided key. pub async fn seek(&mut self, user_key: UserKey<&[u8]>) -> HummockResult<()> { // Reset + self.is_current_pos_valid = false; self.full_key_tracker = FullKeyTracker::new(FullKey::default()); // Handle range scan when key < begin_key - let user_key = match &self.start_bound { + let user_key = match &self.key_range.0 { Included(begin_key) => { let begin_key = begin_key.as_ref(); if begin_key > user_key { @@ -167,7 +171,7 @@ impl> UserIterator { /// Indicates whether the iterator can be used. pub fn is_valid(&self) -> bool { - self.iterator.is_valid() + self.is_current_pos_valid } pub fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) { @@ -178,7 +182,11 @@ impl> UserIterator { /// Advance the inner iterator to a valid position, in which the entry can be exposed. /// Iterator will not be advanced if it already pointed to a valid position. async fn try_advance_to_next_valid(&mut self) -> HummockResult<()> { - while self.iterator.is_valid() { + loop { + if !self.iterator.is_valid() { + break; + } + let full_key = self.iterator.key(); let epoch = full_key.epoch_with_gap.pure_epoch(); @@ -197,10 +205,15 @@ impl> UserIterator { // A new user key is observed. + if self.user_key_out_of_range(full_key.user_key) { + break; + } + // Handle delete operation match self.iterator.value() { HummockValue::Put(_val) => { self.stats.processed_key_count += 1; + self.is_current_pos_valid = true; return Ok(()); } // It means that the key is deleted from the storage. @@ -212,8 +225,20 @@ impl> UserIterator { } self.iterator.next().await?; } + + self.is_current_pos_valid = false; Ok(()) } + + // Validate whether the current key is already out of range. + fn user_key_out_of_range(&self, user_key: UserKey<&[u8]>) -> bool { + // handle range scan + match &self.key_range.1 { + Included(end_key) => user_key > end_key.as_ref(), + Excluded(end_key) => user_key >= end_key.as_ref(), + Unbounded => false, + } + } } #[cfg(test)] diff --git a/src/storage/src/hummock/iterator/mod.rs b/src/storage/src/hummock/iterator/mod.rs index 91a255942a80a..83bf2de61b136 100644 --- a/src/storage/src/hummock/iterator/mod.rs +++ b/src/storage/src/hummock/iterator/mod.rs @@ -14,8 +14,7 @@ use std::future::Future; use std::marker::PhantomData; -use std::ops::Bound::{Excluded, Included, Unbounded}; -use std::ops::{Bound, Deref, DerefMut}; +use std::ops::{Deref, DerefMut}; use more_asserts::assert_gt; @@ -37,7 +36,7 @@ pub mod forward_user; mod merge_inner; pub use forward_user::*; pub use merge_inner::MergeIterator; -use risingwave_hummock_sdk::key::{FullKey, KeyPayloadType, TableKey, UserKey}; +use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey}; use risingwave_hummock_sdk::EpochWithGap; use crate::hummock::iterator::HummockIteratorUnion::{First, Fourth, Second, Third}; @@ -472,78 +471,6 @@ impl<'a, B: RustIteratorBuilder> HummockIterator for FromRustIterator<'a, B> { } } -pub struct UserKeyEndBoundedIterator> { - inner: I, - end_bound: Bound>, - is_valid: bool, -} - -impl> UserKeyEndBoundedIterator { - fn new(inner: I, end_bound: Bound>) -> Self { - Self { - inner, - end_bound, - is_valid: false, - } - } - - fn key_out_of_range(&self, key: UserKey<&[u8]>) -> bool { - match &self.end_bound { - Included(end_key) => key > end_key.as_ref(), - Excluded(end_key) => key >= end_key.as_ref(), - Unbounded => false, - } - } - - fn check_is_valid(&mut self) { - self.is_valid = self.inner.is_valid() && !self.key_out_of_range(self.inner.key().user_key); - } -} - -impl> HummockIterator for UserKeyEndBoundedIterator { - type Direction = Forward; - - async fn next(&mut self) -> HummockResult<()> { - self.inner.next().await?; - self.check_is_valid(); - Ok(()) - } - - fn key(&self) -> FullKey<&[u8]> { - debug_assert!(self.is_valid); - self.inner.key() - } - - fn value(&self) -> HummockValue<&[u8]> { - debug_assert!(self.is_valid); - self.inner.value() - } - - fn is_valid(&self) -> bool { - self.is_valid - } - - async fn rewind(&mut self) -> HummockResult<()> { - self.inner.rewind().await?; - self.check_is_valid(); - Ok(()) - } - - async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> { - self.inner.seek(key).await?; - self.check_is_valid(); - Ok(()) - } - - fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) { - self.inner.collect_local_statistic(stats) - } - - fn value_meta(&self) -> ValueMeta { - self.inner.value_meta() - } -} - #[derive(PartialEq, Eq, Debug)] pub enum DirectionEnum { Forward, diff --git a/src/storage/src/storage_failpoints/test_iterator.rs b/src/storage/src/storage_failpoints/test_iterator.rs index 5dae813f23d5e..463c20ed469de 100644 --- a/src/storage/src/storage_failpoints/test_iterator.rs +++ b/src/storage/src/storage_failpoints/test_iterator.rs @@ -294,7 +294,6 @@ async fn test_failpoints_user_read_err() { let result = ui.next().await; if result.is_err() { assert!(i < 400); - break; } } assert!(i < 400);