Skip to content

Commit

Permalink
use state machine in user key iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
hzxa21 committed Jan 11, 2024
1 parent edaae7e commit e5efde9
Showing 1 changed file with 171 additions and 94 deletions.
265 changes: 171 additions & 94 deletions src/storage/src/hummock/iterator/forward_user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,67 @@ use crate::hummock::value::HummockValue;
use crate::hummock::HummockResult;
use crate::monitor::StoreLocalStatistic;

/// State transitions:
/// Init -> Ready: The initial entry after seek/rewind can be exposed.
/// Init -> Next: The initial entry after seek/rewind cannot be exposed.
/// Init/Next -> Invalid: Iterator has been invalidated.
/// Ready/Next -> Next: Continue to call next on the inner iterator.
/// Next -> Ready: The current entry can be exposed.
enum IteratorState {
/// Initial state on iterator creation
Init,

/// Iterator is pointed to a valid key that can be exposed.
Ready,

/// Iterator is pointed to a key that should not be exposed. next() should be called.
Next,

/// Iterator is no longer valid. Reaons:

Check warning on line 45 in src/storage/src/hummock/iterator/forward_user.rs

View workflow job for this annotation

GitHub Actions / Spell Check with Typos

"Reaons" should be "Reasons".
/// - Iterator is pointed to a out-of-range key.
/// - Iteraror meets EOF.
/// - The internal iterator is not valid.
/// - Error happens during iteration.
Invalid,
}

impl IteratorState {
#[inline]
fn is_next(&self) -> bool {
matches!(self, Self::Next)
}

#[inline]
fn is_ready(&self) -> bool {
matches!(self, Self::Ready)
}

#[inline]
fn is_init(&self) -> bool {
matches!(self, Self::Init)
}

#[inline]
fn next(&mut self) {
*self = IteratorState::Next;
}

#[inline]
fn ready(&mut self) {
*self = IteratorState::Ready;
}

#[inline]
fn reset(&mut self) {
*self = IteratorState::Init;
}

#[inline]
fn invalidate(&mut self) {
*self = IteratorState::Invalid;
}
}

/// [`UserIterator`] can be used by user directly.
pub struct UserIterator<I: HummockIterator<Direction = Forward>> {
/// Inner table iterator.
Expand All @@ -37,9 +98,6 @@ pub struct UserIterator<I: HummockIterator<Direction = Forward>> {
/// Last user value
last_val: Bytes,

/// Flag for whether the iterator reach over the right end of the range.
out_of_range: bool,

/// Start and end bounds of user key.
key_range: UserKeyRange,

Expand All @@ -55,6 +113,9 @@ pub struct UserIterator<I: HummockIterator<Direction = Forward>> {
stats: StoreLocalStatistic,

delete_range_iter: ForwardMergeRangeIterator,

/// Current state of the iterator
state: IteratorState,
}

// TODO: decide whether this should also impl `HummockIterator`
Expand All @@ -70,7 +131,6 @@ impl<I: HummockIterator<Direction = Forward>> UserIterator<I> {
) -> Self {
Self {
iterator,
out_of_range: false,
key_range,
last_key: FullKey::default(),
last_val: Bytes::new(),
Expand All @@ -79,6 +139,7 @@ impl<I: HummockIterator<Direction = Forward>> UserIterator<I> {
stats: StoreLocalStatistic::default(),
delete_range_iter,
_version: version,
state: IteratorState::Init,
}
}

Expand All @@ -96,84 +157,23 @@ impl<I: HummockIterator<Direction = Forward>> UserIterator<I> {
}

/// Gets the iterator move to the next step.
/// See `IteratorState` for the state machine details.
///
/// Returned result:
/// - if `Ok(())` is returned, it means that the iterator successfully move to the next position
/// (may reach to the end and thus not valid)
/// - if `Err(_) ` is returned, it means that some error happened.
pub async fn next(&mut self) -> HummockResult<()> {
assert!(self.iterator.is_valid());
// Iterator must be initialized via seek/rewind before calling next is called
assert!(!self.state.is_init());
loop {
self.iterator.next().await?;
if self.update_iter_state().await? {
self.step_one().await?;
if !self.state.is_next() {
return Ok(());
}
}
}

/// Update internal state of the iterator
/// Returned result:
/// - Err: Error happens when updating the state
/// - Ok(false): Successfully update the state but the inner iterator key-value should not be exposed.
/// Caller should continue to call `next()`.
/// - Ok(true): Successfully update the state and the inner iterator key-value should be exposed.
/// Caller should stop to call `next()`.
async fn update_iter_state(&mut self) -> HummockResult<bool> {
if !self.iterator.is_valid() {
// Caller should stop call `next()`.
return Ok(true);
}
let full_key = self.iterator.key();
let epoch = full_key.epoch_with_gap.pure_epoch();

// handle multi-version
if epoch < self.min_epoch || epoch > self.read_epoch {
// Caller should continue call `next()`.
return Ok(false);
}

if self.last_key.user_key.as_ref() != full_key.user_key {
// It is better to early return here if the user key is already
// out of range to avoid unnecessary access on the range tomestones
// via `delete_range_iter`.
// For example, if we are iterating with key range [0x0a, 0x0c) and the
// current key is 0xff, we will access range tombstones in [0x0c, 0xff],
// which is a waste of work.
if self.key_out_of_range() {
self.out_of_range = true;
// Caller should stop call `next()`.
return Ok(true);
}

self.last_key = full_key.copy_into();
// handle delete operation
match self.iterator.value() {
HummockValue::Put(val) => {
self.delete_range_iter.next_until(full_key.user_key).await?;
if self.delete_range_iter.current_epoch() >= epoch {
self.stats.skip_delete_key_count += 1;
} else {
self.last_val = Bytes::copy_from_slice(val);
self.stats.processed_key_count += 1;
// Caller should stop call `next()`.
return Ok(true);
}
}
// It means that the key is deleted from the storage.
// Deleted kv and the previous versions (if any) of the key should not be
// returned to user.
HummockValue::Delete => {
self.stats.skip_delete_key_count += 1;
}
}
} else {
self.stats.skip_multi_version_key_count += 1;
}

// Caller should continue call `next()`.
Ok(false)
}

/// Returns the key with the newest version. Thus no version in it, and only the `user_key` will
/// be returned.
///
Expand All @@ -196,8 +196,7 @@ impl<I: HummockIterator<Direction = Forward>> UserIterator<I> {

/// Resets the iterating position to the beginning.
pub async fn rewind(&mut self) -> HummockResult<()> {
// Reset
self.out_of_range = false;
self.state.reset();

// Handle range scan
match &self.key_range.0 {
Expand All @@ -208,11 +207,12 @@ impl<I: HummockIterator<Direction = Forward>> UserIterator<I> {
};
self.iterator.seek(full_key.to_ref()).await?;
if !self.iterator.is_valid() {
self.state.invalidate();
return Ok(());
}

if self.key_out_of_range() {
self.out_of_range = true;
self.state.invalidate();
return Ok(());
}

Expand All @@ -222,28 +222,25 @@ impl<I: HummockIterator<Direction = Forward>> UserIterator<I> {
Unbounded => {
self.iterator.rewind().await?;
if !self.iterator.is_valid() {
self.state.invalidate();
return Ok(());
}

self.delete_range_iter.rewind().await?;
}
};

// Handle multi-version
self.last_key = FullKey::default();
if self.update_iter_state().await? {
// The current iterator entry can be exposed
Ok(())
} else {
// Need to find the next entry to be exposed
self.next().await
self.step_one().await?;
if self.state.is_next() {
self.next().await?;
}
Ok(())
}

/// Resets the iterating position to the first position where the key >= provided key.
pub async fn seek(&mut self, user_key: UserKey<&[u8]>) -> HummockResult<()> {
// Reset
self.out_of_range = false;
self.state.reset();

// Handle range scan when key < begin_key
let user_key = match &self.key_range.0 {
Expand All @@ -265,39 +262,119 @@ impl<I: HummockIterator<Direction = Forward>> UserIterator<I> {
};
self.iterator.seek(full_key).await?;
if !self.iterator.is_valid() {
self.state.invalidate();
return Ok(());
}

if self.key_out_of_range() {
self.out_of_range = true;
self.state.invalidate();
return Ok(());
}

self.delete_range_iter.seek(full_key.user_key).await?;

// Handle multi-version
self.last_key = FullKey::default();
if self.update_iter_state().await? {
// The current iterator entry can be exposed
Ok(())
} else {
// Need to find the next entry to be exposed
self.next().await
self.step_one().await?;
if self.state.is_next() {
self.next().await?;
}
Ok(())
}

/// Indicates whether the iterator can be used.
pub fn is_valid(&self) -> bool {
// Handle range scan
// key >= begin_key is guaranteed by seek/rewind function
(!self.out_of_range) && self.iterator.is_valid()
self.state.is_ready()
}

pub fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) {
stats.add(&self.stats);
self.iterator.collect_local_statistic(stats);
}

/// Transition iterator state.
/// See `IteratorState` for the state machine details.
async fn step_one(&mut self) -> HummockResult<()> {
match self.state {
IteratorState::Init => {
self.last_key = FullKey::default();
self.on_next().await?;
Ok(())
}
IteratorState::Ready => {
self.state.next();
Ok(())
}
IteratorState::Next => {
self.iterator.next().await?;
self.on_next().await?;
Ok(())
}
IteratorState::Invalid => {
// End state
Ok(())
}
}
}

async fn on_next(&mut self) -> HummockResult<()> {
if !self.iterator.is_valid() {
self.state.invalidate();
return Ok(());
}

let full_key = self.iterator.key();
let epoch = full_key.epoch_with_gap.pure_epoch();

// Handle epoch visibility
if epoch < self.min_epoch || epoch > self.read_epoch {
self.state.next();
return Ok(());
}

// It is better to early return here if the user key is already
// out of range to avoid unnecessary access on the range tomestones
// via `delete_range_iter`.
// For example, if we are iterating with key range [0x0a, 0x0c) and the
// current key is 0xff, we will access range tombstones in [0x0c, 0xff],
// which is a waste of work.
if self.key_out_of_range() {
self.state.invalidate();
return Ok(());
}

// Skip older version entry for the same user key
if self.last_key.user_key.as_ref() == full_key.user_key {
self.stats.skip_multi_version_key_count += 1;
self.state.next();
return Ok(());
}

self.last_key = full_key.copy_into();

// Handle delete operation
match self.iterator.value() {
HummockValue::Put(val) => {
self.delete_range_iter.next_until(full_key.user_key).await?;
if self.delete_range_iter.current_epoch() >= epoch {
self.stats.skip_delete_key_count += 1;
} else {
self.last_val = Bytes::copy_from_slice(val);
self.stats.processed_key_count += 1;
self.state.ready();
return Ok(());
}
}
// It means that the key is deleted from the storage.
// Deleted kv and the previous versions (if any) of the key should not be
// returned to user.
HummockValue::Delete => {
self.stats.skip_delete_key_count += 1;
}
}

self.state.next();
Ok(())
}

// Validate whether the current key is already out of range.
fn key_out_of_range(&self) -> bool {
assert!(self.iterator.is_valid());
Expand Down

0 comments on commit e5efde9

Please sign in to comment.