From e5efde99fecf3a5f04e17906757392f0f0694582 Mon Sep 17 00:00:00 2001 From: Patrick Huang Date: Thu, 11 Jan 2024 22:53:50 +0800 Subject: [PATCH] use state machine in user key iterator --- .../src/hummock/iterator/forward_user.rs | 265 +++++++++++------- 1 file changed, 171 insertions(+), 94 deletions(-) diff --git a/src/storage/src/hummock/iterator/forward_user.rs b/src/storage/src/hummock/iterator/forward_user.rs index 5895e4c7e5a34..469778e529a67 100644 --- a/src/storage/src/hummock/iterator/forward_user.rs +++ b/src/storage/src/hummock/iterator/forward_user.rs @@ -26,6 +26,67 @@ use crate::hummock::value::HummockValue; use crate::hummock::HummockResult; use crate::monitor::StoreLocalStatistic; +/// State transitions: +/// Init -> Ready: The initial entry after seek/rewind can be exposed. +/// Init -> Next: The initial entry after seek/rewind cannot be exposed. +/// Init/Next -> Invalid: Iterator has been invalidated. +/// Ready/Next -> Next: Continue to call next on the inner iterator. +/// Next -> Ready: The current entry can be exposed. +enum IteratorState { + /// Initial state on iterator creation + Init, + + /// Iterator is pointed to a valid key that can be exposed. + Ready, + + /// Iterator is pointed to a key that should not be exposed. next() should be called. + Next, + + /// Iterator is no longer valid. Reaons: + /// - Iterator is pointed to a out-of-range key. + /// - Iteraror meets EOF. + /// - The internal iterator is not valid. + /// - Error happens during iteration. + Invalid, +} + +impl IteratorState { + #[inline] + fn is_next(&self) -> bool { + matches!(self, Self::Next) + } + + #[inline] + fn is_ready(&self) -> bool { + matches!(self, Self::Ready) + } + + #[inline] + fn is_init(&self) -> bool { + matches!(self, Self::Init) + } + + #[inline] + fn next(&mut self) { + *self = IteratorState::Next; + } + + #[inline] + fn ready(&mut self) { + *self = IteratorState::Ready; + } + + #[inline] + fn reset(&mut self) { + *self = IteratorState::Init; + } + + #[inline] + fn invalidate(&mut self) { + *self = IteratorState::Invalid; + } +} + /// [`UserIterator`] can be used by user directly. pub struct UserIterator> { /// Inner table iterator. @@ -37,9 +98,6 @@ pub struct UserIterator> { /// Last user value last_val: Bytes, - /// Flag for whether the iterator reach over the right end of the range. - out_of_range: bool, - /// Start and end bounds of user key. key_range: UserKeyRange, @@ -55,6 +113,9 @@ pub struct UserIterator> { stats: StoreLocalStatistic, delete_range_iter: ForwardMergeRangeIterator, + + /// Current state of the iterator + state: IteratorState, } // TODO: decide whether this should also impl `HummockIterator` @@ -70,7 +131,6 @@ impl> UserIterator { ) -> Self { Self { iterator, - out_of_range: false, key_range, last_key: FullKey::default(), last_val: Bytes::new(), @@ -79,6 +139,7 @@ impl> UserIterator { stats: StoreLocalStatistic::default(), delete_range_iter, _version: version, + state: IteratorState::Init, } } @@ -96,84 +157,23 @@ impl> UserIterator { } /// Gets the iterator move to the next step. + /// See `IteratorState` for the state machine details. /// /// Returned result: /// - if `Ok(())` is returned, it means that the iterator successfully move to the next position /// (may reach to the end and thus not valid) /// - if `Err(_) ` is returned, it means that some error happened. pub async fn next(&mut self) -> HummockResult<()> { - assert!(self.iterator.is_valid()); + // Iterator must be initialized via seek/rewind before calling next is called + assert!(!self.state.is_init()); loop { - self.iterator.next().await?; - if self.update_iter_state().await? { + self.step_one().await?; + if !self.state.is_next() { return Ok(()); } } } - /// Update internal state of the iterator - /// Returned result: - /// - Err: Error happens when updating the state - /// - Ok(false): Successfully update the state but the inner iterator key-value should not be exposed. - /// Caller should continue to call `next()`. - /// - Ok(true): Successfully update the state and the inner iterator key-value should be exposed. - /// Caller should stop to call `next()`. - async fn update_iter_state(&mut self) -> HummockResult { - if !self.iterator.is_valid() { - // Caller should stop call `next()`. - return Ok(true); - } - let full_key = self.iterator.key(); - let epoch = full_key.epoch_with_gap.pure_epoch(); - - // handle multi-version - if epoch < self.min_epoch || epoch > self.read_epoch { - // Caller should continue call `next()`. - return Ok(false); - } - - if self.last_key.user_key.as_ref() != full_key.user_key { - // It is better to early return here if the user key is already - // out of range to avoid unnecessary access on the range tomestones - // via `delete_range_iter`. - // For example, if we are iterating with key range [0x0a, 0x0c) and the - // current key is 0xff, we will access range tombstones in [0x0c, 0xff], - // which is a waste of work. - if self.key_out_of_range() { - self.out_of_range = true; - // Caller should stop call `next()`. - return Ok(true); - } - - self.last_key = full_key.copy_into(); - // handle delete operation - match self.iterator.value() { - HummockValue::Put(val) => { - self.delete_range_iter.next_until(full_key.user_key).await?; - if self.delete_range_iter.current_epoch() >= epoch { - self.stats.skip_delete_key_count += 1; - } else { - self.last_val = Bytes::copy_from_slice(val); - self.stats.processed_key_count += 1; - // Caller should stop call `next()`. - return Ok(true); - } - } - // It means that the key is deleted from the storage. - // Deleted kv and the previous versions (if any) of the key should not be - // returned to user. - HummockValue::Delete => { - self.stats.skip_delete_key_count += 1; - } - } - } else { - self.stats.skip_multi_version_key_count += 1; - } - - // Caller should continue call `next()`. - Ok(false) - } - /// Returns the key with the newest version. Thus no version in it, and only the `user_key` will /// be returned. /// @@ -196,8 +196,7 @@ impl> UserIterator { /// Resets the iterating position to the beginning. pub async fn rewind(&mut self) -> HummockResult<()> { - // Reset - self.out_of_range = false; + self.state.reset(); // Handle range scan match &self.key_range.0 { @@ -208,11 +207,12 @@ impl> UserIterator { }; self.iterator.seek(full_key.to_ref()).await?; if !self.iterator.is_valid() { + self.state.invalidate(); return Ok(()); } if self.key_out_of_range() { - self.out_of_range = true; + self.state.invalidate(); return Ok(()); } @@ -222,6 +222,7 @@ impl> UserIterator { Unbounded => { self.iterator.rewind().await?; if !self.iterator.is_valid() { + self.state.invalidate(); return Ok(()); } @@ -229,21 +230,17 @@ impl> UserIterator { } }; - // Handle multi-version - self.last_key = FullKey::default(); - if self.update_iter_state().await? { - // The current iterator entry can be exposed - Ok(()) - } else { - // Need to find the next entry to be exposed - self.next().await + self.step_one().await?; + if self.state.is_next() { + self.next().await?; } + Ok(()) } /// Resets the iterating position to the first position where the key >= provided key. pub async fn seek(&mut self, user_key: UserKey<&[u8]>) -> HummockResult<()> { // Reset - self.out_of_range = false; + self.state.reset(); // Handle range scan when key < begin_key let user_key = match &self.key_range.0 { @@ -265,32 +262,27 @@ impl> UserIterator { }; self.iterator.seek(full_key).await?; if !self.iterator.is_valid() { + self.state.invalidate(); return Ok(()); } if self.key_out_of_range() { - self.out_of_range = true; + self.state.invalidate(); return Ok(()); } self.delete_range_iter.seek(full_key.user_key).await?; - // Handle multi-version - self.last_key = FullKey::default(); - if self.update_iter_state().await? { - // The current iterator entry can be exposed - Ok(()) - } else { - // Need to find the next entry to be exposed - self.next().await + self.step_one().await?; + if self.state.is_next() { + self.next().await?; } + Ok(()) } /// Indicates whether the iterator can be used. pub fn is_valid(&self) -> bool { - // Handle range scan - // key >= begin_key is guaranteed by seek/rewind function - (!self.out_of_range) && self.iterator.is_valid() + self.state.is_ready() } pub fn collect_local_statistic(&self, stats: &mut StoreLocalStatistic) { @@ -298,6 +290,91 @@ impl> UserIterator { self.iterator.collect_local_statistic(stats); } + /// Transition iterator state. + /// See `IteratorState` for the state machine details. + async fn step_one(&mut self) -> HummockResult<()> { + match self.state { + IteratorState::Init => { + self.last_key = FullKey::default(); + self.on_next().await?; + Ok(()) + } + IteratorState::Ready => { + self.state.next(); + Ok(()) + } + IteratorState::Next => { + self.iterator.next().await?; + self.on_next().await?; + Ok(()) + } + IteratorState::Invalid => { + // End state + Ok(()) + } + } + } + + async fn on_next(&mut self) -> HummockResult<()> { + if !self.iterator.is_valid() { + self.state.invalidate(); + return Ok(()); + } + + let full_key = self.iterator.key(); + let epoch = full_key.epoch_with_gap.pure_epoch(); + + // Handle epoch visibility + if epoch < self.min_epoch || epoch > self.read_epoch { + self.state.next(); + return Ok(()); + } + + // It is better to early return here if the user key is already + // out of range to avoid unnecessary access on the range tomestones + // via `delete_range_iter`. + // For example, if we are iterating with key range [0x0a, 0x0c) and the + // current key is 0xff, we will access range tombstones in [0x0c, 0xff], + // which is a waste of work. + if self.key_out_of_range() { + self.state.invalidate(); + return Ok(()); + } + + // Skip older version entry for the same user key + if self.last_key.user_key.as_ref() == full_key.user_key { + self.stats.skip_multi_version_key_count += 1; + self.state.next(); + return Ok(()); + } + + self.last_key = full_key.copy_into(); + + // Handle delete operation + match self.iterator.value() { + HummockValue::Put(val) => { + self.delete_range_iter.next_until(full_key.user_key).await?; + if self.delete_range_iter.current_epoch() >= epoch { + self.stats.skip_delete_key_count += 1; + } else { + self.last_val = Bytes::copy_from_slice(val); + self.stats.processed_key_count += 1; + self.state.ready(); + return Ok(()); + } + } + // It means that the key is deleted from the storage. + // Deleted kv and the previous versions (if any) of the key should not be + // returned to user. + HummockValue::Delete => { + self.stats.skip_delete_key_count += 1; + } + } + + self.state.next(); + Ok(()) + } + // Validate whether the current key is already out of range. fn key_out_of_range(&self) -> bool { assert!(self.iterator.is_valid());