Skip to content

Commit

Permalink
Revert "refactor(storage): extract logic of key bounded iterator (#16027
Browse files Browse the repository at this point in the history
)"

This reverts commit 65390d9.
  • Loading branch information
wenym1 authored Apr 11, 2024
1 parent a4a944d commit 639cbe5
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 92 deletions.
57 changes: 41 additions & 16 deletions src/storage/src/hummock/iterator/forward_user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::ops::Bound;
use std::ops::Bound::*;

use risingwave_common::must_match;
use risingwave_common::util::epoch::MAX_SPILL_TIMES;
use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker, KeyPayloadType, UserKey, UserKeyRange};
use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker, UserKey, UserKeyRange};
use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch};

use crate::hummock::iterator::{Forward, HummockIterator, UserKeyEndBoundedIterator};
use crate::hummock::iterator::{Forward, HummockIterator};
use crate::hummock::local_version::pinned_version::PinnedVersion;
use crate::hummock::value::HummockValue;
use crate::hummock::HummockResult;
Expand All @@ -29,13 +28,13 @@ use crate::monitor::StoreLocalStatistic;
/// [`UserIterator`] can be used by user directly.
pub struct UserIterator<I: HummockIterator<Direction = Forward>> {
/// Inner table iterator.
iterator: UserKeyEndBoundedIterator<I>,
iterator: I,

// Track the last seen full key
full_key_tracker: FullKeyTracker<Vec<u8>, true>,

/// Start and end bounds of user key.
start_bound: Bound<UserKey<KeyPayloadType>>,
key_range: UserKeyRange,

/// Only reads values if `ts <= self.read_epoch`.
read_epoch: HummockEpoch,
Expand All @@ -47,24 +46,29 @@ pub struct UserIterator<I: HummockIterator<Direction = Forward>> {
_version: Option<PinnedVersion>,

stats: StoreLocalStatistic,

/// Whether the iterator is pointing to a valid position
is_current_pos_valid: bool,
}

// TODO: decide whether this should also impl `HummockIterator`
impl<I: HummockIterator<Direction = Forward>> UserIterator<I> {
/// Create [`UserIterator`] with given `read_epoch`.
pub(crate) fn new(
iterator: I,
(start_bound, end_bound): UserKeyRange,
key_range: UserKeyRange,
read_epoch: u64,
min_epoch: u64,
version: Option<PinnedVersion>,
) -> Self {
Self {
iterator: UserKeyEndBoundedIterator::new(iterator, end_bound),
start_bound,
iterator,
key_range,
read_epoch,
min_epoch,
stats: StoreLocalStatistic::default(),
_version: version,
is_current_pos_valid: false,
full_key_tracker: FullKeyTracker::new(FullKey::default()),
}
}
Expand All @@ -83,6 +87,8 @@ impl<I: HummockIterator<Direction = Forward>> UserIterator<I> {
/// (may reach to the end and thus not valid)
/// - if `Err(_) ` is returned, it means that some error happened.
pub async fn next(&mut self) -> HummockResult<()> {
// Reset the valid flag to make sure if error happens, `is_valid` should return false.
self.is_current_pos_valid = false;
// Move the iterator to the next step if it is currently potined to a ready entry.
self.iterator.next().await?;

Expand All @@ -99,10 +105,6 @@ impl<I: HummockIterator<Direction = Forward>> UserIterator<I> {
/// Note: before call the function you need to ensure that the iterator is valid.
pub fn key(&self) -> FullKey<&[u8]> {
assert!(self.is_valid());
debug_assert_eq!(
self.full_key_tracker.latest_full_key.to_ref(),
self.iterator.key()
);
self.full_key_tracker.latest_full_key.to_ref()
}

Expand All @@ -117,10 +119,11 @@ impl<I: HummockIterator<Direction = Forward>> UserIterator<I> {
/// Resets the iterating position to the beginning.
pub async fn rewind(&mut self) -> HummockResult<()> {
// Reset
self.is_current_pos_valid = false;
self.full_key_tracker = FullKeyTracker::new(FullKey::default());

// Handle range scan
match &self.start_bound {
match &self.key_range.0 {
Included(begin_key) => {
let full_key = FullKey {
user_key: begin_key.clone(),
Expand All @@ -140,10 +143,11 @@ impl<I: HummockIterator<Direction = Forward>> UserIterator<I> {
/// Resets the iterating position to the first position where the key >= provided key.
pub async fn seek(&mut self, user_key: UserKey<&[u8]>) -> HummockResult<()> {
// Reset
self.is_current_pos_valid = false;
self.full_key_tracker = FullKeyTracker::new(FullKey::default());

// Handle range scan when key < begin_key
let user_key = match &self.start_bound {
let user_key = match &self.key_range.0 {
Included(begin_key) => {
let begin_key = begin_key.as_ref();
if begin_key > user_key {
Expand All @@ -167,7 +171,7 @@ impl<I: HummockIterator<Direction = Forward>> UserIterator<I> {

/// Indicates whether the iterator can be used.
pub fn is_valid(&self) -> bool {
self.iterator.is_valid()
self.is_current_pos_valid
}

pub fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
Expand All @@ -178,7 +182,11 @@ impl<I: HummockIterator<Direction = Forward>> UserIterator<I> {
/// Advance the inner iterator to a valid position, in which the entry can be exposed.
/// Iterator will not be advanced if it already pointed to a valid position.
async fn try_advance_to_next_valid(&mut self) -> HummockResult<()> {
while self.iterator.is_valid() {
loop {
if !self.iterator.is_valid() {
break;
}

let full_key = self.iterator.key();
let epoch = full_key.epoch_with_gap.pure_epoch();

Expand All @@ -197,10 +205,15 @@ impl<I: HummockIterator<Direction = Forward>> UserIterator<I> {

// A new user key is observed.

if self.user_key_out_of_range(full_key.user_key) {
break;
}

// Handle delete operation
match self.iterator.value() {
HummockValue::Put(_val) => {
self.stats.processed_key_count += 1;
self.is_current_pos_valid = true;
return Ok(());
}
// It means that the key is deleted from the storage.
Expand All @@ -212,8 +225,20 @@ impl<I: HummockIterator<Direction = Forward>> UserIterator<I> {
}
self.iterator.next().await?;
}

self.is_current_pos_valid = false;
Ok(())
}

// Validate whether the current key is already out of range.
fn user_key_out_of_range(&self, user_key: UserKey<&[u8]>) -> bool {
// handle range scan
match &self.key_range.1 {
Included(end_key) => user_key > end_key.as_ref(),
Excluded(end_key) => user_key >= end_key.as_ref(),
Unbounded => false,
}
}
}

#[cfg(test)]
Expand Down
77 changes: 2 additions & 75 deletions src/storage/src/hummock/iterator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@

use std::future::Future;
use std::marker::PhantomData;
use std::ops::Bound::{Excluded, Included, Unbounded};
use std::ops::{Bound, Deref, DerefMut};
use std::ops::{Deref, DerefMut};

use more_asserts::assert_gt;

Expand All @@ -37,7 +36,7 @@ pub mod forward_user;
mod merge_inner;
pub use forward_user::*;
pub use merge_inner::MergeIterator;
use risingwave_hummock_sdk::key::{FullKey, KeyPayloadType, TableKey, UserKey};
use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey};
use risingwave_hummock_sdk::EpochWithGap;

use crate::hummock::iterator::HummockIteratorUnion::{First, Fourth, Second, Third};
Expand Down Expand Up @@ -472,78 +471,6 @@ impl<'a, B: RustIteratorBuilder> HummockIterator for FromRustIterator<'a, B> {
}
}

pub struct UserKeyEndBoundedIterator<I: HummockIterator<Direction = Forward>> {
inner: I,
end_bound: Bound<UserKey<KeyPayloadType>>,
is_valid: bool,
}

impl<I: HummockIterator<Direction = Forward>> UserKeyEndBoundedIterator<I> {
fn new(inner: I, end_bound: Bound<UserKey<KeyPayloadType>>) -> Self {
Self {
inner,
end_bound,
is_valid: false,
}
}

fn key_out_of_range(&self, key: UserKey<&[u8]>) -> bool {
match &self.end_bound {
Included(end_key) => key > end_key.as_ref(),
Excluded(end_key) => key >= end_key.as_ref(),
Unbounded => false,
}
}

fn check_is_valid(&mut self) {
self.is_valid = self.inner.is_valid() && !self.key_out_of_range(self.inner.key().user_key);
}
}

impl<I: HummockIterator<Direction = Forward>> HummockIterator for UserKeyEndBoundedIterator<I> {
type Direction = Forward;

async fn next(&mut self) -> HummockResult<()> {
self.inner.next().await?;
self.check_is_valid();
Ok(())
}

fn key(&self) -> FullKey<&[u8]> {
debug_assert!(self.is_valid);
self.inner.key()
}

fn value(&self) -> HummockValue<&[u8]> {
debug_assert!(self.is_valid);
self.inner.value()
}

fn is_valid(&self) -> bool {
self.is_valid
}

async fn rewind(&mut self) -> HummockResult<()> {
self.inner.rewind().await?;
self.check_is_valid();
Ok(())
}

async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
self.inner.seek(key).await?;
self.check_is_valid();
Ok(())
}

fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
self.inner.collect_local_statistic(stats)
}

fn value_meta(&self) -> ValueMeta {
self.inner.value_meta()
}
}

#[derive(PartialEq, Eq, Debug)]
pub enum DirectionEnum {
Forward,
Expand Down
1 change: 0 additions & 1 deletion src/storage/src/storage_failpoints/test_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,6 @@ async fn test_failpoints_user_read_err() {
let result = ui.next().await;
if result.is_err() {
assert!(i < 400);
break;
}
}
assert!(i < 400);
Expand Down

0 comments on commit 639cbe5

Please sign in to comment.