From da97cfebea78009070b01b7818d6d78e4cae6351 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Wed, 20 Mar 2024 11:16:35 +0800 Subject: [PATCH] tmp commit Signed-off-by: Little-Wallace --- src/storage/src/hummock/iterator/mod.rs | 12 +- src/storage/src/hummock/store/mod.rs | 160 +++++++++++++++++++++++ src/storage/src/hummock/store/version.rs | 8 +- 3 files changed, 175 insertions(+), 5 deletions(-) diff --git a/src/storage/src/hummock/iterator/mod.rs b/src/storage/src/hummock/iterator/mod.rs index 01807a451108e..e03fe3e6260d3 100644 --- a/src/storage/src/hummock/iterator/mod.rs +++ b/src/storage/src/hummock/iterator/mod.rs @@ -18,7 +18,7 @@ use std::ops::{Deref, DerefMut}; use more_asserts::assert_gt; -use super::{HummockResult, HummockValue}; +use super::{HummockResult, HummockValue, SstableIteratorType}; mod forward_concat; pub use forward_concat::*; @@ -51,6 +51,7 @@ pub use delete_range_iterator::{ }; use risingwave_common::catalog::TableId; pub use skip_watermark::*; +use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferBatch; use crate::monitor::StoreLocalStatistic; @@ -496,3 +497,12 @@ impl HummockIteratorDirection for Backward { DirectionEnum::Backward } } + +pub trait IteratorFactory { + type Direction: HummockIteratorDirection; + type SstableIteratorType: SstableIteratorType; + fn add_batch_iter(&mut self, batch: SharedBufferBatch); + fn add_staging_sst_iter(&mut self, sst: Self::SstableIteratorType); + fn add_overlapping_sst_iter(&mut self, iter: Self::SstableIteratorType); + fn add_concat_sst_iter(&mut self, iter: ConcatIteratorInner); +} \ No newline at end of file diff --git a/src/storage/src/hummock/store/mod.rs b/src/storage/src/hummock/store/mod.rs index f3e13e2519b53..358b56fe2955e 100644 --- a/src/storage/src/hummock/store/mod.rs +++ b/src/storage/src/hummock/store/mod.rs @@ -18,3 +18,163 @@ pub mod version; pub use hummock_storage::*; pub use local_hummock_storage::*; + +use risingwave_hummock_sdk::HummockEpoch; + +use crate::hummock::iterator::{ + Backward, BackwardMergeRangeIterator, ConcatIteratorInner, Forward, ForwardMergeRangeIterator, + HummockIteratorUnion, IteratorFactory, OrderedMergeIteratorInner, UnorderedMergeIteratorInner, +}; +use crate::hummock::shared_buffer::shared_buffer_batch::{ + SharedBufferBatch, SharedBufferBatchIterator, +}; +use crate::hummock::{BackwardSstableIterator, SstableIterator}; + +pub type StagingDataIterator = OrderedMergeIteratorInner< + HummockIteratorUnion, SstableIterator>, +>; + +pub type BackwardStagingDataIterator = OrderedMergeIteratorInner< + HummockIteratorUnion, BackwardSstableIterator>, +>; + +pub type ForwardUnionIterator = HummockIteratorUnion< + Forward, + StagingDataIterator, + SstableIterator, + ConcatIteratorInner, +>; + +pub type BackwardUnionIterator = HummockIteratorUnion< + Backward, + BackwardStagingDataIterator, + BackwardSstableIterator, + ConcatIteratorInner, +>; + +pub type HummockStorageIteratorPayload = UnorderedMergeIteratorInner; + +pub struct ForwardIteratorFactory { + delete_range_iter: ForwardMergeRangeIterator, + non_overlapping_iters: Vec>, + overlapping_iters: Vec, + staging_iters: + Vec, SstableIterator>>, +} + +impl ForwardIteratorFactory { + pub fn new(read_epoch: HummockEpoch) -> Self { + Self { + delete_range_iter: ForwardMergeRangeIterator::new(read_epoch), + non_overlapping_iters: vec![], + overlapping_iters: vec![], + staging_iters: vec![], + } + } + + pub fn build(self) -> (HummockStorageIteratorPayload, ForwardMergeRangeIterator) { + // 3. build user_iterator + let staging_iter = StagingDataIterator::new(self.staging_iters); + let merge_iter = UnorderedMergeIteratorInner::new( + once(HummockIteratorUnion::First(staging_iter)) + .chain( + self.overlapping_iters + .into_iter() + .map(HummockIteratorUnion::Second), + ) + .chain( + self.non_overlapping_iters + .into_iter() + .map(HummockIteratorUnion::Third), + ), + ); + (merge_iter, self.delete_range_iter) + } +} + +impl IteratorFactory for ForwardIteratorFactory { + type Direction = Forward; + type SstableIteratorType = SstableIterator; + + fn add_batch_iter(&mut self, batch: SharedBufferBatch) { + self.staging_iters + .push(HummockIteratorUnion::First(batch.into_forward_iter())); + } + + fn add_staging_sst_iter(&mut self, iter: Self::SstableIteratorType) { + self.staging_iters.push(HummockIteratorUnion::Second(iter)); + } + + fn add_overlapping_sst_iter(&mut self, iter: Self::SstableIteratorType) { + self.overlapping_iters.push(iter); + } + + fn add_concat_sst_iter(&mut self, iter: ConcatIteratorInner) { + self.non_overlapping_iters.push(iter); + } +} + +pub struct BackwardIteratorFactory { + non_overlapping_iters: Vec>, + overlapping_iters: Vec, + staging_iters: Vec< + HummockIteratorUnion< + Backward, + SharedBufferBatchIterator, + BackwardSstableIterator, + >, + >, +} + +impl BackwardIteratorFactory { + pub fn new() -> Self { + Self { + non_overlapping_iters: vec![], + overlapping_iters: vec![], + staging_iters: vec![], + } + } + + pub fn build( + self, + ) -> UnorderedMergeIteratorInner { + // 3. build user_iterator + let staging_iter = BackwardStagingDataIterator::new(self.staging_iters); + let merge_iter = UnorderedMergeIteratorInner::new( + once(HummockIteratorUnion::First(staging_iter)) + .chain( + self.overlapping_iters + .into_iter() + .map(HummockIteratorUnion::Second), + ) + .chain( + self.non_overlapping_iters + .into_iter() + .map(HummockIteratorUnion::Third), + ), + ); + merge_iter + } +} + +impl IteratorFactory for BackwardIteratorFactory { + type Direction = Backward; + type SstableIteratorType = BackwardSstableIterator; + + fn add_batch_iter(&mut self, batch: SharedBufferBatch) { + self.staging_iters + .push(HummockIteratorUnion::First(batch.into_backward_iter())); + } + + fn add_staging_sst_iter(&mut self, iter: Self::SstableIteratorType) { + self.staging_iters.push(HummockIteratorUnion::Second(iter)); + } + + fn add_overlapping_sst_iter(&mut self, iter: Self::SstableIteratorType) { + self.overlapping_iters.push(iter); + } + + fn add_concat_sst_iter(&mut self, iter: ConcatIteratorInner) { + self.non_overlapping_iters.push(iter); + } +} \ No newline at end of file diff --git a/src/storage/src/hummock/store/version.rs b/src/storage/src/hummock/store/version.rs index 232b55dd1212c..572a633e4f9d0 100644 --- a/src/storage/src/hummock/store/version.rs +++ b/src/storage/src/hummock/store/version.rs @@ -766,14 +766,14 @@ impl HummockVersionReader { .await } - pub async fn iter_inner<'a, 'b>( - &'a self, + pub async fn iter_inner( + &self, table_key_range: TableKeyRange, epoch: u64, read_options: ReadOptions, read_version_tuple: ReadVersionTuple, - mem_table: Option>, - ) -> StorageResult> { + factory: &mut F, + ) -> StorageResult<()> { let (imms, uncommitted_ssts, committed) = read_version_tuple; let mut local_stats = StoreLocalStatistic::default();