Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Revert "refactor(storage): extract logic of key bounded iterator" #16247

Merged
merged 1 commit into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 41 additions & 16 deletions src/storage/src/hummock/iterator/forward_user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::ops::Bound;
use std::ops::Bound::*;

use risingwave_common::must_match;
use risingwave_common::util::epoch::MAX_SPILL_TIMES;
use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker, KeyPayloadType, UserKey, UserKeyRange};
use risingwave_hummock_sdk::key::{FullKey, FullKeyTracker, UserKey, UserKeyRange};
use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch};

use crate::hummock::iterator::{Forward, HummockIterator, UserKeyEndBoundedIterator};
use crate::hummock::iterator::{Forward, HummockIterator};
use crate::hummock::local_version::pinned_version::PinnedVersion;
use crate::hummock::value::HummockValue;
use crate::hummock::HummockResult;
Expand All @@ -29,13 +28,13 @@ use crate::monitor::StoreLocalStatistic;
/// [`UserIterator`] can be used by user directly.
pub struct UserIterator<I: HummockIterator<Direction = Forward>> {
/// Inner table iterator.
iterator: UserKeyEndBoundedIterator<I>,
iterator: I,

// Track the last seen full key
full_key_tracker: FullKeyTracker<Vec<u8>, true>,

/// Start and end bounds of user key.
start_bound: Bound<UserKey<KeyPayloadType>>,
key_range: UserKeyRange,

/// Only reads values if `ts <= self.read_epoch`.
read_epoch: HummockEpoch,
Expand All @@ -47,24 +46,29 @@ pub struct UserIterator<I: HummockIterator<Direction = Forward>> {
_version: Option<PinnedVersion>,

stats: StoreLocalStatistic,

/// Whether the iterator is pointing to a valid position
is_current_pos_valid: bool,
}

// TODO: decide whether this should also impl `HummockIterator`
impl<I: HummockIterator<Direction = Forward>> UserIterator<I> {
/// Create [`UserIterator`] with given `read_epoch`.
pub(crate) fn new(
iterator: I,
(start_bound, end_bound): UserKeyRange,
key_range: UserKeyRange,
read_epoch: u64,
min_epoch: u64,
version: Option<PinnedVersion>,
) -> Self {
Self {
iterator: UserKeyEndBoundedIterator::new(iterator, end_bound),
start_bound,
iterator,
key_range,
read_epoch,
min_epoch,
stats: StoreLocalStatistic::default(),
_version: version,
is_current_pos_valid: false,
full_key_tracker: FullKeyTracker::new(FullKey::default()),
}
}
Expand All @@ -83,6 +87,8 @@ impl<I: HummockIterator<Direction = Forward>> UserIterator<I> {
/// (may reach to the end and thus not valid)
/// - if `Err(_) ` is returned, it means that some error happened.
pub async fn next(&mut self) -> HummockResult<()> {
// Reset the valid flag to make sure if error happens, `is_valid` should return false.
self.is_current_pos_valid = false;
// Move the iterator to the next step if it is currently potined to a ready entry.
self.iterator.next().await?;

Expand All @@ -99,10 +105,6 @@ impl<I: HummockIterator<Direction = Forward>> UserIterator<I> {
/// Note: before call the function you need to ensure that the iterator is valid.
pub fn key(&self) -> FullKey<&[u8]> {
assert!(self.is_valid());
debug_assert_eq!(
self.full_key_tracker.latest_full_key.to_ref(),
self.iterator.key()
);
self.full_key_tracker.latest_full_key.to_ref()
}

Expand All @@ -117,10 +119,11 @@ impl<I: HummockIterator<Direction = Forward>> UserIterator<I> {
/// Resets the iterating position to the beginning.
pub async fn rewind(&mut self) -> HummockResult<()> {
// Reset
self.is_current_pos_valid = false;
self.full_key_tracker = FullKeyTracker::new(FullKey::default());

// Handle range scan
match &self.start_bound {
match &self.key_range.0 {
Included(begin_key) => {
let full_key = FullKey {
user_key: begin_key.clone(),
Expand All @@ -140,10 +143,11 @@ impl<I: HummockIterator<Direction = Forward>> UserIterator<I> {
/// Resets the iterating position to the first position where the key >= provided key.
pub async fn seek(&mut self, user_key: UserKey<&[u8]>) -> HummockResult<()> {
// Reset
self.is_current_pos_valid = false;
self.full_key_tracker = FullKeyTracker::new(FullKey::default());

// Handle range scan when key < begin_key
let user_key = match &self.start_bound {
let user_key = match &self.key_range.0 {
Included(begin_key) => {
let begin_key = begin_key.as_ref();
if begin_key > user_key {
Expand All @@ -167,7 +171,7 @@ impl<I: HummockIterator<Direction = Forward>> UserIterator<I> {

/// Indicates whether the iterator can be used.
pub fn is_valid(&self) -> bool {
self.iterator.is_valid()
self.is_current_pos_valid
}

pub fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
Expand All @@ -178,7 +182,11 @@ impl<I: HummockIterator<Direction = Forward>> UserIterator<I> {
/// Advance the inner iterator to a valid position, in which the entry can be exposed.
/// Iterator will not be advanced if it already pointed to a valid position.
async fn try_advance_to_next_valid(&mut self) -> HummockResult<()> {
while self.iterator.is_valid() {
loop {
if !self.iterator.is_valid() {
break;
}

let full_key = self.iterator.key();
let epoch = full_key.epoch_with_gap.pure_epoch();

Expand All @@ -197,10 +205,15 @@ impl<I: HummockIterator<Direction = Forward>> UserIterator<I> {

// A new user key is observed.

if self.user_key_out_of_range(full_key.user_key) {
break;
}

// Handle delete operation
match self.iterator.value() {
HummockValue::Put(_val) => {
self.stats.processed_key_count += 1;
self.is_current_pos_valid = true;
return Ok(());
}
// It means that the key is deleted from the storage.
Expand All @@ -212,8 +225,20 @@ impl<I: HummockIterator<Direction = Forward>> UserIterator<I> {
}
self.iterator.next().await?;
}

self.is_current_pos_valid = false;
Ok(())
}

// Validate whether the current key is already out of range.
fn user_key_out_of_range(&self, user_key: UserKey<&[u8]>) -> bool {
// handle range scan
match &self.key_range.1 {
Included(end_key) => user_key > end_key.as_ref(),
Excluded(end_key) => user_key >= end_key.as_ref(),
Unbounded => false,
}
}
}

#[cfg(test)]
Expand Down
77 changes: 2 additions & 75 deletions src/storage/src/hummock/iterator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@

use std::future::Future;
use std::marker::PhantomData;
use std::ops::Bound::{Excluded, Included, Unbounded};
use std::ops::{Bound, Deref, DerefMut};
use std::ops::{Deref, DerefMut};

use more_asserts::assert_gt;

Expand All @@ -37,7 +36,7 @@ pub mod forward_user;
mod merge_inner;
pub use forward_user::*;
pub use merge_inner::MergeIterator;
use risingwave_hummock_sdk::key::{FullKey, KeyPayloadType, TableKey, UserKey};
use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey};
use risingwave_hummock_sdk::EpochWithGap;

use crate::hummock::iterator::HummockIteratorUnion::{First, Fourth, Second, Third};
Expand Down Expand Up @@ -472,78 +471,6 @@ impl<'a, B: RustIteratorBuilder> HummockIterator for FromRustIterator<'a, B> {
}
}

pub struct UserKeyEndBoundedIterator<I: HummockIterator<Direction = Forward>> {
inner: I,
end_bound: Bound<UserKey<KeyPayloadType>>,
is_valid: bool,
}

impl<I: HummockIterator<Direction = Forward>> UserKeyEndBoundedIterator<I> {
fn new(inner: I, end_bound: Bound<UserKey<KeyPayloadType>>) -> Self {
Self {
inner,
end_bound,
is_valid: false,
}
}

fn key_out_of_range(&self, key: UserKey<&[u8]>) -> bool {
match &self.end_bound {
Included(end_key) => key > end_key.as_ref(),
Excluded(end_key) => key >= end_key.as_ref(),
Unbounded => false,
}
}

fn check_is_valid(&mut self) {
self.is_valid = self.inner.is_valid() && !self.key_out_of_range(self.inner.key().user_key);
}
}

impl<I: HummockIterator<Direction = Forward>> HummockIterator for UserKeyEndBoundedIterator<I> {
type Direction = Forward;

async fn next(&mut self) -> HummockResult<()> {
self.inner.next().await?;
self.check_is_valid();
Ok(())
}

fn key(&self) -> FullKey<&[u8]> {
debug_assert!(self.is_valid);
self.inner.key()
}

fn value(&self) -> HummockValue<&[u8]> {
debug_assert!(self.is_valid);
self.inner.value()
}

fn is_valid(&self) -> bool {
self.is_valid
}

async fn rewind(&mut self) -> HummockResult<()> {
self.inner.rewind().await?;
self.check_is_valid();
Ok(())
}

async fn seek<'a>(&'a mut self, key: FullKey<&'a [u8]>) -> HummockResult<()> {
self.inner.seek(key).await?;
self.check_is_valid();
Ok(())
}

fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
self.inner.collect_local_statistic(stats)
}

fn value_meta(&self) -> ValueMeta {
self.inner.value_meta()
}
}

#[derive(PartialEq, Eq, Debug)]
pub enum DirectionEnum {
Forward,
Expand Down
1 change: 0 additions & 1 deletion src/storage/src/storage_failpoints/test_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,6 @@ async fn test_failpoints_user_read_err() {
let result = ui.next().await;
if result.is_err() {
assert!(i < 400);
break;
}
}
assert!(i < 400);
Expand Down
Loading