Skip to content

Commit

Permalink
rev iterator
Browse files Browse the repository at this point in the history
Signed-off-by: Little-Wallace <[email protected]>
  • Loading branch information
Little-Wallace committed Mar 20, 2024
1 parent b139383 commit f4b5eca
Show file tree
Hide file tree
Showing 12 changed files with 808 additions and 127 deletions.
15 changes: 7 additions & 8 deletions src/storage/src/hummock/iterator/backward_user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ pub struct BackwardUserIterator<I: HummockIterator<Direction = Backward>> {

impl<I: HummockIterator<Direction = Backward>> BackwardUserIterator<I> {
/// Creates [`BackwardUserIterator`] with given `read_epoch`.
#[cfg(test)]
pub(crate) fn with_epoch(
pub fn new(
iterator: I,
key_range: UserKeyRange,
read_epoch: u64,
Expand Down Expand Up @@ -196,15 +195,15 @@ impl<I: HummockIterator<Direction = Backward>> BackwardUserIterator<I> {
/// `rewind` or `seek` methods are called.
///
/// Note: before call the function you need to ensure that the iterator is valid.
pub fn key(&self) -> &FullKey<Bytes> {
pub fn key(&self) -> FullKey<&[u8]> {
assert!(self.is_valid());
&self.last_key
self.last_key.to_ref()
}

/// The returned value is in the form of user value.
///
/// Note: before calling the function you need to ensure that the iterator is valid.
pub fn value(&self) -> &Bytes {
/// Note: before call the function you need to ensure that the iterator is valid.
pub fn value(&self) -> &[u8] {
assert!(self.is_valid());
&self.last_val
}
Expand Down Expand Up @@ -277,7 +276,7 @@ impl<I: HummockIterator<Direction = Backward>> BackwardUserIterator<I> {
impl<I: HummockIterator<Direction = Backward>> BackwardUserIterator<I> {
/// Creates [`BackwardUserIterator`] with maximum epoch.
pub(crate) fn for_test(iterator: I, key_range: UserKeyRange) -> Self {
Self::with_epoch(iterator, key_range, HummockEpoch::MAX, 0, None)
Self::new(iterator, key_range, HummockEpoch::MAX, 0, None)
}

/// Creates [`BackwardUserIterator`] with maximum epoch.
Expand All @@ -286,7 +285,7 @@ impl<I: HummockIterator<Direction = Backward>> BackwardUserIterator<I> {
key_range: UserKeyRange,
min_epoch: HummockEpoch,
) -> Self {
Self::with_epoch(iterator, key_range, HummockEpoch::MAX, min_epoch, None)
Self::new(iterator, key_range, HummockEpoch::MAX, min_epoch, None)
}
}

Expand Down
110 changes: 110 additions & 0 deletions src/storage/src/hummock/iterator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,116 @@ impl<'a, B: RustIteratorBuilder> HummockIterator for FromRustIterator<'a, B> {
}
}

pub struct FromRustRevIterator<'a, B: RustIteratorBuilder> {
inner: &'a B::Iterable,
#[expect(clippy::type_complexity)]
iter: Option<(
RustIteratorOfBuilder<'a, B>,
TableKey<&'a [u8]>,
HummockValue<&'a [u8]>,
)>,
epoch: EpochWithGap,
table_id: TableId,
}

impl<'a, B: RustIteratorBuilder> FromRustRevIterator<'a, B> {
pub fn new(inner: &'a B::Iterable, epoch: EpochWithGap, table_id: TableId) -> Self {
Self {
inner,
iter: None,
epoch,
table_id,
}
}
}

impl<'a, B: RustIteratorBuilder> HummockIterator for FromRustRevIterator<'a, B> {
type Direction = Backward;

async fn next(&mut self) -> HummockResult<()> {
let (iter, key, value) = self.iter.as_mut().expect("should be valid");
if let Some((new_key, new_value)) = iter.next() {
*key = new_key;
*value = new_value;
} else {
self.iter = None;
}
Ok(())
}

fn key(&self) -> FullKey<&[u8]> {
let (_, key, _) = self.iter.as_ref().expect("should be valid");
FullKey {
epoch_with_gap: self.epoch,
user_key: UserKey {
table_id: self.table_id,
table_key: *key,
},
}
}

fn value(&self) -> HummockValue<&[u8]> {
let (_, _, value) = self.iter.as_ref().expect("should be valid");
*value
}

fn is_valid(&self) -> bool {
self.iter.is_some()
}

async fn rewind(&mut self) -> HummockResult<()> {
let mut iter = B::rewind(self.inner);
if let Some((key, value)) = iter.next() {
self.iter = Some((RustIteratorOfBuilder::Rewind(iter), key, value));
} else {
self.iter = None;
}
Ok(())
}

async fn seek<'b>(&'b mut self, key: FullKey<&'b [u8]>) -> HummockResult<()> {
if self.table_id > key.user_key.table_id {
// returns None when the range of self.table_id must not include the given key
self.iter = None;
return Ok(());
}
if self.table_id < key.user_key.table_id {
return self.rewind().await;
}
let mut iter = B::seek(self.inner, key.user_key.table_key);
match iter.next() {
Some((first_key, first_value)) => {
let first_full_key = FullKey {
epoch_with_gap: self.epoch,
user_key: UserKey {
table_id: self.table_id,
table_key: first_key,
},
};
if first_full_key > key {
// The semantic of `seek_fn` will ensure that `first_key` <= table_key of `key`.
// At the beginning we have checked that `self.table_id` <= table_id of `key`.
// Therefore, when `first_full_key` > `key`, the only possibility is that
// `first_key` == table_key of `key`, and `self.table_id` == table_id of `key`,
// the `self.epoch` > epoch of `key`.
assert_eq!(first_key, key.user_key.table_key);
}
self.iter = Some((RustIteratorOfBuilder::Seek(iter), first_key, first_value));
}
None => {
self.iter = None;
}
}
Ok(())
}

fn collect_local_statistic(&self, _stats: &mut StoreLocalStatistic) {}

fn value_meta(&self) -> ValueMeta {
ValueMeta::default()
}
}

#[derive(PartialEq, Eq, Debug)]
pub enum DirectionEnum {
Forward,
Expand Down
40 changes: 38 additions & 2 deletions src/storage/src/hummock/store/hummock_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ use crate::hummock::observer_manager::HummockObserverNode;
use crate::hummock::utils::{validate_safe_epoch, wait_for_epoch};
use crate::hummock::write_limiter::{WriteLimiter, WriteLimiterRef};
use crate::hummock::{
HummockEpoch, HummockError, HummockResult, HummockStorageIterator, MemoryLimiter,
SstableObjectIdManager, SstableObjectIdManagerRef, SstableStoreRef,
HummockEpoch, HummockError, HummockResult, HummockStorageIterator, HummockStorageRevIterator,
MemoryLimiter, SstableObjectIdManager, SstableObjectIdManagerRef, SstableStoreRef,
};
use crate::mem_table::ImmutableMemtable;
use crate::monitor::{CompactorMetrics, HummockStateStoreMetrics, StoreLocalStatistic};
Expand Down Expand Up @@ -284,6 +284,24 @@ impl HummockStorage {
.await
}

async fn rev_iter_inner(
&self,
key_range: TableKeyRange,
epoch: u64,
read_options: ReadOptions,
) -> StorageResult<HummockStorageRevIterator> {
let (key_range, read_version_tuple) = if read_options.read_version_from_backup {
self.build_read_version_tuple_from_backup(epoch, read_options.table_id, key_range)
.await?
} else {
self.build_read_version_tuple(epoch, read_options.table_id, key_range)?
};

self.hummock_version_reader
.rev_iter(key_range, epoch, read_options, read_version_tuple)
.await
}

async fn build_read_version_tuple_from_backup(
&self,
epoch: u64,
Expand Down Expand Up @@ -459,6 +477,7 @@ impl HummockStorage {
impl StateStoreRead for HummockStorage {
type ChangeLogIter = PanicStateStoreIter<StateStoreReadLogItem>;
type Iter = HummockStorageIterator;
type RevIter = HummockStorageRevIterator;

fn get(
&self,
Expand Down Expand Up @@ -486,6 +505,23 @@ impl StateStoreRead for HummockStorage {
self.iter_inner(key_range, epoch, read_options)
}

fn rev_iter(
&self,
key_range: TableKeyRange,
epoch: u64,
read_options: ReadOptions,
) -> impl Future<Output = StorageResult<Self::RevIter>> + '_ {
let (l_vnode_inclusive, r_vnode_exclusive) = vnode_range(&key_range);
assert_eq!(
r_vnode_exclusive - l_vnode_inclusive,
1,
"read range {:?} for table {} iter contains more than one vnode",
key_range,
read_options.table_id
);
self.rev_iter_inner(key_range, epoch, read_options)
}

async fn iter_log(
&self,
_epoch_range: (u64, u64),
Expand Down
Loading

0 comments on commit f4b5eca

Please sign in to comment.