Skip to content

Commit

Permalink
tmp commit
Browse files Browse the repository at this point in the history
Signed-off-by: Little-Wallace <[email protected]>
  • Loading branch information
Little-Wallace committed Mar 20, 2024
1 parent 1ccb354 commit da97cfe
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 5 deletions.
12 changes: 11 additions & 1 deletion src/storage/src/hummock/iterator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::ops::{Deref, DerefMut};

use more_asserts::assert_gt;

use super::{HummockResult, HummockValue};
use super::{HummockResult, HummockValue, SstableIteratorType};

mod forward_concat;
pub use forward_concat::*;
Expand Down Expand Up @@ -51,6 +51,7 @@ pub use delete_range_iterator::{
};
use risingwave_common::catalog::TableId;
pub use skip_watermark::*;
use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferBatch;

use crate::monitor::StoreLocalStatistic;

Expand Down Expand Up @@ -496,3 +497,12 @@ impl HummockIteratorDirection for Backward {
DirectionEnum::Backward
}
}

pub trait IteratorFactory {
type Direction: HummockIteratorDirection;
type SstableIteratorType: SstableIteratorType<Direction = Self::Direction>;
fn add_batch_iter(&mut self, batch: SharedBufferBatch);
fn add_staging_sst_iter(&mut self, sst: Self::SstableIteratorType);
fn add_overlapping_sst_iter(&mut self, iter: Self::SstableIteratorType);
fn add_concat_sst_iter(&mut self, iter: ConcatIteratorInner<Self::SstableIteratorType>);
}
160 changes: 160 additions & 0 deletions src/storage/src/hummock/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,163 @@ pub mod version;

pub use hummock_storage::*;
pub use local_hummock_storage::*;

use risingwave_hummock_sdk::HummockEpoch;

use crate::hummock::iterator::{
Backward, BackwardMergeRangeIterator, ConcatIteratorInner, Forward, ForwardMergeRangeIterator,
HummockIteratorUnion, IteratorFactory, OrderedMergeIteratorInner, UnorderedMergeIteratorInner,
};
use crate::hummock::shared_buffer::shared_buffer_batch::{
SharedBufferBatch, SharedBufferBatchIterator,
};
use crate::hummock::{BackwardSstableIterator, SstableIterator};

pub type StagingDataIterator = OrderedMergeIteratorInner<
HummockIteratorUnion<Forward, SharedBufferBatchIterator<Forward>, SstableIterator>,
>;

pub type BackwardStagingDataIterator = OrderedMergeIteratorInner<
HummockIteratorUnion<Backward, SharedBufferBatchIterator<Backward>, BackwardSstableIterator>,
>;

pub type ForwardUnionIterator = HummockIteratorUnion<
Forward,
StagingDataIterator,
SstableIterator,
ConcatIteratorInner<SstableIterator>,
>;

pub type BackwardUnionIterator = HummockIteratorUnion<
Backward,
BackwardStagingDataIterator,
BackwardSstableIterator,
ConcatIteratorInner<BackwardSstableIterator>,
>;

pub type HummockStorageIteratorPayload = UnorderedMergeIteratorInner<ForwardUnionIterator>;

pub struct ForwardIteratorFactory {
delete_range_iter: ForwardMergeRangeIterator,
non_overlapping_iters: Vec<ConcatIteratorInner<SstableIterator>>,
overlapping_iters: Vec<SstableIterator>,
staging_iters:
Vec<HummockIteratorUnion<Forward, SharedBufferBatchIterator<Forward>, SstableIterator>>,
}

impl ForwardIteratorFactory {
pub fn new(read_epoch: HummockEpoch) -> Self {
Self {
delete_range_iter: ForwardMergeRangeIterator::new(read_epoch),
non_overlapping_iters: vec![],
overlapping_iters: vec![],
staging_iters: vec![],
}
}

pub fn build(self) -> (HummockStorageIteratorPayload, ForwardMergeRangeIterator) {
// 3. build user_iterator
let staging_iter = StagingDataIterator::new(self.staging_iters);
let merge_iter = UnorderedMergeIteratorInner::new(
once(HummockIteratorUnion::First(staging_iter))
.chain(
self.overlapping_iters
.into_iter()
.map(HummockIteratorUnion::Second),
)
.chain(
self.non_overlapping_iters
.into_iter()
.map(HummockIteratorUnion::Third),
),
);
(merge_iter, self.delete_range_iter)
}
}

impl IteratorFactory for ForwardIteratorFactory {
type Direction = Forward;
type SstableIteratorType = SstableIterator;

fn add_batch_iter(&mut self, batch: SharedBufferBatch) {
self.staging_iters
.push(HummockIteratorUnion::First(batch.into_forward_iter()));
}

fn add_staging_sst_iter(&mut self, iter: Self::SstableIteratorType) {
self.staging_iters.push(HummockIteratorUnion::Second(iter));
}

fn add_overlapping_sst_iter(&mut self, iter: Self::SstableIteratorType) {
self.overlapping_iters.push(iter);
}

fn add_concat_sst_iter(&mut self, iter: ConcatIteratorInner<Self::SstableIteratorType>) {
self.non_overlapping_iters.push(iter);
}
}

pub struct BackwardIteratorFactory {
non_overlapping_iters: Vec<ConcatIteratorInner<BackwardSstableIterator>>,
overlapping_iters: Vec<BackwardSstableIterator>,
staging_iters: Vec<
HummockIteratorUnion<
Backward,
SharedBufferBatchIterator<Backward>,
BackwardSstableIterator,
>,
>,
}

impl BackwardIteratorFactory {
pub fn new() -> Self {
Self {
non_overlapping_iters: vec![],
overlapping_iters: vec![],
staging_iters: vec![],
}
}

pub fn build(
self,
) -> UnorderedMergeIteratorInner<BackwardUnionIterator> {
// 3. build user_iterator
let staging_iter = BackwardStagingDataIterator::new(self.staging_iters);
let merge_iter = UnorderedMergeIteratorInner::new(
once(HummockIteratorUnion::First(staging_iter))
.chain(
self.overlapping_iters
.into_iter()
.map(HummockIteratorUnion::Second),
)
.chain(
self.non_overlapping_iters
.into_iter()
.map(HummockIteratorUnion::Third),
),
);
merge_iter
}
}

impl IteratorFactory for BackwardIteratorFactory {
type Direction = Backward;
type SstableIteratorType = BackwardSstableIterator;

fn add_batch_iter(&mut self, batch: SharedBufferBatch) {
self.staging_iters
.push(HummockIteratorUnion::First(batch.into_backward_iter()));
}

fn add_staging_sst_iter(&mut self, iter: Self::SstableIteratorType) {
self.staging_iters.push(HummockIteratorUnion::Second(iter));
}

fn add_overlapping_sst_iter(&mut self, iter: Self::SstableIteratorType) {
self.overlapping_iters.push(iter);
}

fn add_concat_sst_iter(&mut self, iter: ConcatIteratorInner<Self::SstableIteratorType>) {
self.non_overlapping_iters.push(iter);
}
}
8 changes: 4 additions & 4 deletions src/storage/src/hummock/store/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -766,14 +766,14 @@ impl HummockVersionReader {
.await
}

pub async fn iter_inner<'a, 'b>(
&'a self,
pub async fn iter_inner<F: IteratorFactory>(
&self,
table_key_range: TableKeyRange,
epoch: u64,
read_options: ReadOptions,
read_version_tuple: ReadVersionTuple,
mem_table: Option<MemTableHummockIterator<'b>>,
) -> StorageResult<HummockStorageIteratorInner<'b>> {
factory: &mut F,
) -> StorageResult<()> {
let (imms, uncommitted_ssts, committed) = read_version_tuple;

let mut local_stats = StoreLocalStatistic::default();
Expand Down

0 comments on commit da97cfe

Please sign in to comment.