Skip to content

Commit

Permalink
refactor code
Browse files Browse the repository at this point in the history
Signed-off-by: Little-Wallace <[email protected]>
  • Loading branch information
Little-Wallace committed Mar 20, 2024
1 parent da97cfe commit b139383
Show file tree
Hide file tree
Showing 6 changed files with 284 additions and 284 deletions.
4 changes: 2 additions & 2 deletions src/storage/src/hummock/iterator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ pub use delete_range_iterator::{
};
use risingwave_common::catalog::TableId;
pub use skip_watermark::*;
use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferBatch;

use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferBatch;
use crate::monitor::StoreLocalStatistic;

#[derive(Default)]
Expand Down Expand Up @@ -505,4 +505,4 @@ pub trait IteratorFactory {
fn add_staging_sst_iter(&mut self, sst: Self::SstableIteratorType);
fn add_overlapping_sst_iter(&mut self, iter: Self::SstableIteratorType);
fn add_concat_sst_iter(&mut self, iter: ConcatIteratorInner<Self::SstableIteratorType>);
}
}
142 changes: 140 additions & 2 deletions src/storage/src/hummock/store/local_hummock_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::future::Future;
use std::iter::once;
use std::ops::Bound;
use std::sync::Arc;

Expand All @@ -30,7 +31,8 @@ use crate::error::StorageResult;
use crate::hummock::event_handler::hummock_event_handler::HummockEventSender;
use crate::hummock::event_handler::{HummockEvent, HummockReadVersionRef, LocalInstanceGuard};
use crate::hummock::iterator::{
ConcatIteratorInner, Forward, HummockIteratorUnion, MergeIterator, UserIterator,
Backward, ConcatIteratorInner, Forward, HummockIteratorUnion, IteratorFactory, MergeIterator,
UserIterator,
};
use crate::hummock::shared_buffer::shared_buffer_batch::{
SharedBufferBatch, SharedBufferBatchIterator,
Expand All @@ -41,7 +43,7 @@ use crate::hummock::utils::{
ENABLE_SANITY_CHECK,
};
use crate::hummock::write_limiter::WriteLimiterRef;
use crate::hummock::{MemoryLimiter, SstableIterator};
use crate::hummock::{BackwardSstableIterator, MemoryLimiter, SstableIterator};
use crate::mem_table::{KeyOp, MemTable, MemTableHummockIterator};
use crate::monitor::{HummockStateStoreMetrics, IterLocalMetricsGuard, StoreLocalStatistic};
use crate::panic_store::PanicStateStoreIter;
Expand Down Expand Up @@ -589,6 +591,9 @@ impl LocalHummockStorage {
pub type StagingDataIterator = MergeIterator<
HummockIteratorUnion<Forward, SharedBufferBatchIterator<Forward>, SstableIterator>,
>;
pub type StagingDataRevIterator = MergeIterator<
HummockIteratorUnion<Backward, SharedBufferBatchIterator<Backward>, BackwardSstableIterator>,
>;
pub type HummockStorageIteratorPayloadInner<'a> = MergeIterator<
HummockIteratorUnion<
Forward,
Expand All @@ -599,8 +604,19 @@ pub type HummockStorageIteratorPayloadInner<'a> = MergeIterator<
>,
>;

// pub type HummockStorageRevIteratorPayloadInner<'a> = MergeIterator<
// HummockIteratorUnion<
// Backward,
// StagingDataRevIterator,
// BackwardSstableIterator,
// ConcatIteratorInner<BackwardSstableIterator>,
// MemTableHummockIterator<'a>,
// >,
// >;

pub type HummockStorageIterator = HummockStorageIteratorInner<'static>;
pub type LocalHummockStorageIterator<'a> = HummockStorageIteratorInner<'a>;
// pub type LocalHummockStorageRevIterator<'a> = HummockStorageRevIteratorInner<'a>;

pub struct HummockStorageIteratorInner<'a> {
inner: UserIterator<HummockStorageIteratorPayloadInner<'a>>,
Expand Down Expand Up @@ -646,3 +662,125 @@ impl<'a> Drop for HummockStorageIteratorInner<'a> {
.collect_local_statistic(&mut self.stats_guard.local_stats);
}
}

pub struct ForwardIteratorFactory {
non_overlapping_iters: Vec<ConcatIteratorInner<SstableIterator>>,
overlapping_iters: Vec<SstableIterator>,
staging_iters:
Vec<HummockIteratorUnion<Forward, SharedBufferBatchIterator<Forward>, SstableIterator>>,
}

impl ForwardIteratorFactory {
pub fn new() -> Self {
Self {
non_overlapping_iters: vec![],
overlapping_iters: vec![],
staging_iters: vec![],
}
}

pub fn build<'a>(
self,
mem_table: Option<MemTableHummockIterator<'a>>,
) -> HummockStorageIteratorPayloadInner<'a> {
// 3. build user_iterator
let staging_iter = StagingDataIterator::new(self.staging_iters);
let merge_iter = MergeIterator::new(
once(HummockIteratorUnion::First(staging_iter))
.chain(
self.overlapping_iters
.into_iter()
.map(HummockIteratorUnion::Second),
)
.chain(
self.non_overlapping_iters
.into_iter()
.map(HummockIteratorUnion::Third),
)
.chain(mem_table.into_iter().map(HummockIteratorUnion::Fourth)),
);
merge_iter
}
}

impl IteratorFactory for ForwardIteratorFactory {
type Direction = Forward;
type SstableIteratorType = SstableIterator;

fn add_batch_iter(&mut self, batch: SharedBufferBatch) {
self.staging_iters
.push(HummockIteratorUnion::First(batch.into_forward_iter()));
}

fn add_staging_sst_iter(&mut self, iter: Self::SstableIteratorType) {
self.staging_iters.push(HummockIteratorUnion::Second(iter));
}

fn add_overlapping_sst_iter(&mut self, iter: Self::SstableIteratorType) {
self.overlapping_iters.push(iter);
}

fn add_concat_sst_iter(&mut self, iter: ConcatIteratorInner<Self::SstableIteratorType>) {
self.non_overlapping_iters.push(iter);
}
}

// pub struct BackwardIteratorFactory {
// non_overlapping_iters: Vec<ConcatIteratorInner<BackwardSstableIterator>>,
// overlapping_iters: Vec<BackwardSstableIterator>,
// staging_iters: Vec<HummockIteratorUnion<Backward, SharedBufferBatchIterator<Backward>, BackwardSstableIterator>>,
// }
//
// impl BackwardIteratorFactory {
// pub fn new() -> Self {
// Self {
// non_overlapping_iters: vec![],
// overlapping_iters: vec![],
// staging_iters: vec![],
// }
// }
//
// pub fn build<'a>(
// self,
// mem_table: Option<MemTableHummockIterator<'a>>,
// ) -> HummockStorageRevIteratorPayloadInner<'a> {
// // 3. build user_iterator
// let staging_iter = StagingDataRevIterator::new(self.staging_iters);
// let merge_iter = MergeIterator::new(
// once(HummockIteratorUnion::First(staging_iter))
// .chain(
// self.overlapping_iters
// .into_iter()
// .map(HummockIteratorUnion::Second),
// )
// .chain(
// self.non_overlapping_iters
// .into_iter()
// .map(HummockIteratorUnion::Third),
// ).chain(mem_table.into_iter().map(HummockIteratorUnion::Fourth)),
// );
// merge_iter
// }
// }
//
// impl IteratorFactory for BackwardIteratorFactory {
// type Direction = Backward;
// type SstableIteratorType = BackwardSstableIterator;
//
// fn add_batch_iter(&mut self, batch: SharedBufferBatch) {
// self.staging_iters
// .push(HummockIteratorUnion::First(batch.into_backward_iter()));
// }
//
// fn add_staging_sst_iter(&mut self, iter: Self::SstableIteratorType) {
// self.staging_iters.push(HummockIteratorUnion::Second(iter));
// }
//
// fn add_overlapping_sst_iter(&mut self, iter: Self::SstableIteratorType) {
// self.overlapping_iters.push(iter);
// }
//
// fn add_concat_sst_iter(&mut self, iter: ConcatIteratorInner<Self::SstableIteratorType>) {
// self.non_overlapping_iters.push(iter);
// }
// }
160 changes: 0 additions & 160 deletions src/storage/src/hummock/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,163 +18,3 @@ pub mod version;

pub use hummock_storage::*;
pub use local_hummock_storage::*;

use risingwave_hummock_sdk::HummockEpoch;

use crate::hummock::iterator::{
Backward, BackwardMergeRangeIterator, ConcatIteratorInner, Forward, ForwardMergeRangeIterator,
HummockIteratorUnion, IteratorFactory, OrderedMergeIteratorInner, UnorderedMergeIteratorInner,
};
use crate::hummock::shared_buffer::shared_buffer_batch::{
SharedBufferBatch, SharedBufferBatchIterator,
};
use crate::hummock::{BackwardSstableIterator, SstableIterator};

pub type StagingDataIterator = OrderedMergeIteratorInner<
HummockIteratorUnion<Forward, SharedBufferBatchIterator<Forward>, SstableIterator>,
>;

pub type BackwardStagingDataIterator = OrderedMergeIteratorInner<
HummockIteratorUnion<Backward, SharedBufferBatchIterator<Backward>, BackwardSstableIterator>,
>;

pub type ForwardUnionIterator = HummockIteratorUnion<
Forward,
StagingDataIterator,
SstableIterator,
ConcatIteratorInner<SstableIterator>,
>;

pub type BackwardUnionIterator = HummockIteratorUnion<
Backward,
BackwardStagingDataIterator,
BackwardSstableIterator,
ConcatIteratorInner<BackwardSstableIterator>,
>;

pub type HummockStorageIteratorPayload = UnorderedMergeIteratorInner<ForwardUnionIterator>;

pub struct ForwardIteratorFactory {
delete_range_iter: ForwardMergeRangeIterator,
non_overlapping_iters: Vec<ConcatIteratorInner<SstableIterator>>,
overlapping_iters: Vec<SstableIterator>,
staging_iters:
Vec<HummockIteratorUnion<Forward, SharedBufferBatchIterator<Forward>, SstableIterator>>,
}

impl ForwardIteratorFactory {
pub fn new(read_epoch: HummockEpoch) -> Self {
Self {
delete_range_iter: ForwardMergeRangeIterator::new(read_epoch),
non_overlapping_iters: vec![],
overlapping_iters: vec![],
staging_iters: vec![],
}
}

pub fn build(self) -> (HummockStorageIteratorPayload, ForwardMergeRangeIterator) {
// 3. build user_iterator
let staging_iter = StagingDataIterator::new(self.staging_iters);
let merge_iter = UnorderedMergeIteratorInner::new(
once(HummockIteratorUnion::First(staging_iter))
.chain(
self.overlapping_iters
.into_iter()
.map(HummockIteratorUnion::Second),
)
.chain(
self.non_overlapping_iters
.into_iter()
.map(HummockIteratorUnion::Third),
),
);
(merge_iter, self.delete_range_iter)
}
}

impl IteratorFactory for ForwardIteratorFactory {
type Direction = Forward;
type SstableIteratorType = SstableIterator;

fn add_batch_iter(&mut self, batch: SharedBufferBatch) {
self.staging_iters
.push(HummockIteratorUnion::First(batch.into_forward_iter()));
}

fn add_staging_sst_iter(&mut self, iter: Self::SstableIteratorType) {
self.staging_iters.push(HummockIteratorUnion::Second(iter));
}

fn add_overlapping_sst_iter(&mut self, iter: Self::SstableIteratorType) {
self.overlapping_iters.push(iter);
}

fn add_concat_sst_iter(&mut self, iter: ConcatIteratorInner<Self::SstableIteratorType>) {
self.non_overlapping_iters.push(iter);
}
}

pub struct BackwardIteratorFactory {
non_overlapping_iters: Vec<ConcatIteratorInner<BackwardSstableIterator>>,
overlapping_iters: Vec<BackwardSstableIterator>,
staging_iters: Vec<
HummockIteratorUnion<
Backward,
SharedBufferBatchIterator<Backward>,
BackwardSstableIterator,
>,
>,
}

impl BackwardIteratorFactory {
pub fn new() -> Self {
Self {
non_overlapping_iters: vec![],
overlapping_iters: vec![],
staging_iters: vec![],
}
}

pub fn build(
self,
) -> UnorderedMergeIteratorInner<BackwardUnionIterator> {
// 3. build user_iterator
let staging_iter = BackwardStagingDataIterator::new(self.staging_iters);
let merge_iter = UnorderedMergeIteratorInner::new(
once(HummockIteratorUnion::First(staging_iter))
.chain(
self.overlapping_iters
.into_iter()
.map(HummockIteratorUnion::Second),
)
.chain(
self.non_overlapping_iters
.into_iter()
.map(HummockIteratorUnion::Third),
),
);
merge_iter
}
}

impl IteratorFactory for BackwardIteratorFactory {
type Direction = Backward;
type SstableIteratorType = BackwardSstableIterator;

fn add_batch_iter(&mut self, batch: SharedBufferBatch) {
self.staging_iters
.push(HummockIteratorUnion::First(batch.into_backward_iter()));
}

fn add_staging_sst_iter(&mut self, iter: Self::SstableIteratorType) {
self.staging_iters.push(HummockIteratorUnion::Second(iter));
}

fn add_overlapping_sst_iter(&mut self, iter: Self::SstableIteratorType) {
self.overlapping_iters.push(iter);
}

fn add_concat_sst_iter(&mut self, iter: ConcatIteratorInner<Self::SstableIteratorType>) {
self.non_overlapping_iters.push(iter);
}
}
Loading

0 comments on commit b139383

Please sign in to comment.