Skip to content

Commit

Permalink
refactor: fix incorrect skip multi-version key metric and save unnecc…
Browse files Browse the repository at this point in the history
…essary key comparison (#14494)
  • Loading branch information
hzxa21 authored Jan 16, 2024
1 parent 25d5bd4 commit da5e90d
Showing 1 changed file with 96 additions and 76 deletions.
172 changes: 96 additions & 76 deletions src/storage/src/hummock/iterator/forward_user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@ pub struct UserIterator<I: HummockIterator<Direction = Forward>> {
/// Last user value
last_val: Bytes,

/// Flag for whether the iterator reach over the right end of the range.
out_of_range: bool,

/// Start and end bounds of user key.
key_range: UserKeyRange,

Expand All @@ -55,6 +52,9 @@ pub struct UserIterator<I: HummockIterator<Direction = Forward>> {
stats: StoreLocalStatistic,

delete_range_iter: ForwardMergeRangeIterator,

/// Whether the iterator is pointing to a valid position
is_current_pos_valid: bool,
}

// TODO: decide whether this should also impl `HummockIterator`
Expand All @@ -70,7 +70,6 @@ impl<I: HummockIterator<Direction = Forward>> UserIterator<I> {
) -> Self {
Self {
iterator,
out_of_range: false,
key_range,
last_key: FullKey::default(),
last_val: Bytes::new(),
Expand All @@ -79,6 +78,7 @@ impl<I: HummockIterator<Direction = Forward>> UserIterator<I> {
stats: StoreLocalStatistic::default(),
delete_range_iter,
_version: version,
is_current_pos_valid: false,
}
}

Expand All @@ -96,62 +96,20 @@ impl<I: HummockIterator<Direction = Forward>> UserIterator<I> {
}

/// Gets the iterator move to the next step.
/// See `IteratorState` for the state machine details.
///
/// Returned result:
/// - if `Ok(())` is returned, it means that the iterator successfully move to the next position
/// (may reach to the end and thus not valid)
/// - if `Err(_) ` is returned, it means that some error happened.
pub async fn next(&mut self) -> HummockResult<()> {
while self.iterator.is_valid() {
let full_key = self.iterator.key();
let epoch = full_key.epoch_with_gap.pure_epoch();
// Reset the valid flag to make sure if error happens, `is_valid` should return false.
self.is_current_pos_valid = false;
// Move the iterator to the next step if it is currently potined to a ready entry.
self.iterator.next().await?;

// handle multi-version
if epoch < self.min_epoch || epoch > self.read_epoch {
self.iterator.next().await?;
continue;
}

if self.last_key.user_key.as_ref() != full_key.user_key {
// It is better to early return here if the user key is already
// out of range to avoid unnecessary access on the range tomestones
// via `delete_range_iter`.
// For example, if we are iterating with key range [0x0a, 0x0c) and the
// current key is 0xff, we will access range tombstones in [0x0c, 0xff],
// which is a waste of work.
if self.key_out_of_range() {
self.out_of_range = true;
return Ok(());
}

self.last_key = full_key.copy_into();
// handle delete operation
match self.iterator.value() {
HummockValue::Put(val) => {
self.delete_range_iter.next_until(full_key.user_key).await?;
if self.delete_range_iter.current_epoch() >= epoch {
self.stats.skip_delete_key_count += 1;
} else {
self.last_val = Bytes::copy_from_slice(val);
self.stats.processed_key_count += 1;
return Ok(());
}
}
// It means that the key is deleted from the storage.
// Deleted kv and the previous versions (if any) of the key should not be
// returned to user.
HummockValue::Delete => {
self.stats.skip_delete_key_count += 1;
}
}
} else {
self.stats.skip_multi_version_key_count += 1;
}

self.iterator.next().await?;
}

Ok(()) // not valid, EOF
// Check and move onto the next valid position if any
self.try_advance_to_next_valid().await
}

/// Returns the key with the newest version. Thus no version in it, and only the `user_key` will
Expand All @@ -177,7 +135,8 @@ impl<I: HummockIterator<Direction = Forward>> UserIterator<I> {
/// Resets the iterating position to the beginning.
pub async fn rewind(&mut self) -> HummockResult<()> {
// Reset
self.out_of_range = false;
self.is_current_pos_valid = false;
self.last_key = FullKey::default();

// Handle range scan
match &self.key_range.0 {
Expand All @@ -187,19 +146,24 @@ impl<I: HummockIterator<Direction = Forward>> UserIterator<I> {
epoch_with_gap: EpochWithGap::new(self.read_epoch, MAX_SPILL_TIMES),
};
self.iterator.seek(full_key.to_ref()).await?;
// We check `is_valid` and `user_key_out_of_range` here to avoid `delete_range_iter.seek`
// as much as possible because it can be expensive. We can simplify this logic after delete
// range is deprecated and replaced with vnode watermark.
if !self.iterator.is_valid() {
return Ok(());
}

if self.key_out_of_range() {
self.out_of_range = true;
if self.user_key_out_of_range(self.iterator.key().user_key) {
return Ok(());
}

self.delete_range_iter.seek(begin_key.as_ref()).await?;
}
Excluded(_) => unimplemented!("excluded begin key is not supported"),
Unbounded => {
// We check `is_valid` here to avoid `delete_range_iter.rewind`
// as much as possible because it can be expensive. We can simplify this logic after delete
// range is deprecated and replaced with vnode watermark.
self.iterator.rewind().await?;
if !self.iterator.is_valid() {
return Ok(());
Expand All @@ -209,16 +173,14 @@ impl<I: HummockIterator<Direction = Forward>> UserIterator<I> {
}
};

// Handle multi-version
self.last_key = FullKey::default();
// Handles range scan when key > end_key
self.next().await
self.try_advance_to_next_valid().await
}

/// Resets the iterating position to the first position where the key >= provided key.
pub async fn seek(&mut self, user_key: UserKey<&[u8]>) -> HummockResult<()> {
// Reset
self.out_of_range = false;
self.is_current_pos_valid = false;
self.last_key = FullKey::default();

// Handle range scan when key < begin_key
let user_key = match &self.key_range.0 {
Expand All @@ -239,44 +201,102 @@ impl<I: HummockIterator<Direction = Forward>> UserIterator<I> {
epoch_with_gap: EpochWithGap::new(self.read_epoch, MAX_SPILL_TIMES),
};
self.iterator.seek(full_key).await?;
// We check `is_valid` and `user_key_out_of_range` here to avoid `delete_range_iter.seek`
// as much as possible because it can be expensive. We can simplify this logic after delete
// range is deprecated and replaced with vnode watermark.
if !self.iterator.is_valid() {
return Ok(());
}

if self.key_out_of_range() {
self.out_of_range = true;
if self.user_key_out_of_range(self.iterator.key().user_key) {
return Ok(());
}

self.delete_range_iter.seek(full_key.user_key).await?;

// Handle multi-version
self.last_key = FullKey::default();
// Handle range scan when key > end_key

self.next().await
self.try_advance_to_next_valid().await
}

/// Indicates whether the iterator can be used.
pub fn is_valid(&self) -> bool {
// Handle range scan
// key >= begin_key is guaranteed by seek/rewind function
(!self.out_of_range) && self.iterator.is_valid()
self.is_current_pos_valid
}

pub fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
stats.add(&self.stats);
self.iterator.collect_local_statistic(stats);
}

/// Advance the inner iterator to a valid position, in which the entry can be exposed.
/// Iterator will not be advanced if it already pointed to a valid position.
async fn try_advance_to_next_valid(&mut self) -> HummockResult<()> {
loop {
if !self.iterator.is_valid() {
break;
}

let full_key = self.iterator.key();
let epoch = full_key.epoch_with_gap.pure_epoch();

// Handle epoch visibility
if epoch < self.min_epoch || epoch > self.read_epoch {
self.iterator.next().await?;
continue;
}

// Skip older version entry for the same user key
if self.last_key.user_key.as_ref() == full_key.user_key {
self.stats.skip_multi_version_key_count += 1;
self.iterator.next().await?;
continue;
}

// A new user key is observed.
self.last_key = full_key.copy_into();

// It is better to early return here if the user key is already
// out of range to avoid unnecessary access on the range tomestones
// via `delete_range_iter`.
// For example, if we are iterating with key range [0x0a, 0x0c) and the
// current key is 0xff, we will access range tombstones in [0x0c, 0xff],
// which is a waste of work.
if self.user_key_out_of_range(full_key.user_key) {
break;
}

// Handle delete operation
match self.iterator.value() {
HummockValue::Put(val) => {
self.delete_range_iter.next_until(full_key.user_key).await?;
if self.delete_range_iter.current_epoch() >= epoch {
self.stats.skip_delete_key_count += 1;
} else {
self.last_val = Bytes::copy_from_slice(val);
self.stats.processed_key_count += 1;
self.is_current_pos_valid = true;
return Ok(());
}
}
// It means that the key is deleted from the storage.
// Deleted kv and the previous versions (if any) of the key should not be
// returned to user.
HummockValue::Delete => {
self.stats.skip_delete_key_count += 1;
}
}
self.iterator.next().await?;
}

self.is_current_pos_valid = false;
Ok(())
}

// Validate whether the current key is already out of range.
fn key_out_of_range(&self) -> bool {
assert!(self.iterator.is_valid());
let current_user_key = self.iterator.key().user_key;
fn user_key_out_of_range(&self, user_key: UserKey<&[u8]>) -> bool {
// handle range scan
match &self.key_range.1 {
Included(end_key) => current_user_key > end_key.as_ref(),
Excluded(end_key) => current_user_key >= end_key.as_ref(),
Included(end_key) => user_key > end_key.as_ref(),
Excluded(end_key) => user_key >= end_key.as_ref(),
Unbounded => false,
}
}
Expand Down

0 comments on commit da5e90d

Please sign in to comment.