diff --git a/src/storage/benches/bench_compactor.rs b/src/storage/benches/bench_compactor.rs index 1a5f8c94cd110..29d2696d8323b 100644 --- a/src/storage/benches/bench_compactor.rs +++ b/src/storage/benches/bench_compactor.rs @@ -31,8 +31,7 @@ use risingwave_storage::hummock::compactor::{ ConcatSstableIterator, DummyCompactionFilter, TaskConfig, TaskProgress, }; use risingwave_storage::hummock::iterator::{ - ConcatIterator, Forward, ForwardMergeRangeIterator, HummockIterator, - UnorderedMergeIteratorInner, + ConcatIterator, Forward, ForwardMergeRangeIterator, HummockIterator, MergeIterator, }; use risingwave_storage::hummock::multi_builder::{ CapacitySplitTableBuilder, LocalTableBuilderFactory, @@ -239,7 +238,7 @@ fn bench_merge_iterator_compactor(c: &mut Criterion) { ConcatIterator::new(level1.clone(), sstable_store.clone(), read_options.clone()), ConcatIterator::new(level2.clone(), sstable_store.clone(), read_options.clone()), ]; - let iter = UnorderedMergeIteratorInner::for_compactor(sub_iters); + let iter = MergeIterator::for_compactor(sub_iters); async move { compact(iter, sstable_store1).await } }); }); @@ -263,7 +262,7 @@ fn bench_merge_iterator_compactor(c: &mut Criterion) { 0, ), ]; - let iter = UnorderedMergeIteratorInner::for_compactor(sub_iters); + let iter = MergeIterator::for_compactor(sub_iters); let sstable_store1 = sstable_store.clone(); async move { compact(iter, sstable_store1).await } }); diff --git a/src/storage/benches/bench_merge_iter.rs b/src/storage/benches/bench_merge_iter.rs index 1e44d80279d01..c7b42f1894265 100644 --- a/src/storage/benches/bench_merge_iter.rs +++ b/src/storage/benches/bench_merge_iter.rs @@ -20,8 +20,7 @@ use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; use futures::executor::block_on; use risingwave_hummock_sdk::key::TableKey; use risingwave_storage::hummock::iterator::{ - Forward, HummockIterator, HummockIteratorUnion, OrderedMergeIteratorInner, - SkipWatermarkIterator, UnorderedMergeIteratorInner, + Forward, HummockIterator, HummockIteratorUnion, MergeIterator, SkipWatermarkIterator, }; use risingwave_storage::hummock::shared_buffer::shared_buffer_batch::{ SharedBufferBatch, SharedBufferBatchIterator, @@ -97,23 +96,9 @@ fn run_iter>(iter_ref: &RefCell, tota } fn criterion_benchmark(c: &mut Criterion) { - let ordered_merge_iter = RefCell::new(OrderedMergeIteratorInner::new( - gen_interleave_shared_buffer_batch_iter(10000, 100), - )); - - c.bench_with_input( - BenchmarkId::new("bench-merge-iter", "ordered"), - &ordered_merge_iter, - |b, iter_ref| { - b.iter(|| { - run_iter(iter_ref, 100 * 10000); - }); - }, - ); - - let merge_iter = RefCell::new(UnorderedMergeIteratorInner::new( - gen_interleave_shared_buffer_batch_iter(10000, 100), - )); + let merge_iter = RefCell::new(MergeIterator::new(gen_interleave_shared_buffer_batch_iter( + 10000, 100, + ))); c.bench_with_input( BenchmarkId::new("bench-merge-iter", "unordered"), &merge_iter, @@ -125,7 +110,7 @@ fn criterion_benchmark(c: &mut Criterion) { ); let merge_iter = RefCell::new(SkipWatermarkIterator::new( - UnorderedMergeIteratorInner::new(gen_interleave_shared_buffer_batch_iter(10000, 100)), + MergeIterator::new(gen_interleave_shared_buffer_batch_iter(10000, 100)), BTreeMap::new(), )); c.bench_with_input( @@ -138,7 +123,7 @@ fn criterion_benchmark(c: &mut Criterion) { }, ); - let merge_iter = RefCell::new(UnorderedMergeIteratorInner::new( + let merge_iter = RefCell::new(MergeIterator::new( gen_interleave_shared_buffer_batch_enum_iter(10000, 100), )); c.bench_with_input( @@ -150,20 +135,6 @@ fn criterion_benchmark(c: &mut Criterion) { }); }, ); - - let ordered_merge_iter = RefCell::new(OrderedMergeIteratorInner::new( - gen_interleave_shared_buffer_batch_enum_iter(10000, 100), - )); - - c.bench_with_input( - BenchmarkId::new("bench-enum-merge-iter", "ordered"), - &ordered_merge_iter, - |b, iter_ref| { - b.iter(|| { - run_iter(iter_ref, 100 * 10000); - }); - }, - ); } criterion_group!(benches, criterion_benchmark); diff --git a/src/storage/hummock_test/src/state_store_tests.rs b/src/storage/hummock_test/src/state_store_tests.rs index d146e058d5b2d..d46831b8d30c0 100644 --- a/src/storage/hummock_test/src/state_store_tests.rs +++ b/src/storage/hummock_test/src/state_store_tests.rs @@ -22,7 +22,6 @@ use futures::{pin_mut, StreamExt, TryStreamExt}; use risingwave_common::cache::CachePriority; use risingwave_common::catalog::{TableId, TableOption}; use risingwave_common::hash::VirtualNode; -use risingwave_hummock_sdk::key::FullKey; use risingwave_hummock_sdk::{ HummockEpoch, HummockReadEpoch, HummockSstableObjectId, LocalSstableInfo, }; @@ -713,307 +712,308 @@ async fn test_reload_storage() { assert_eq!(len, 3); } -#[tokio::test] -async fn test_write_anytime_v2() { - let (hummock_storage, meta_client) = with_hummock_storage_v2(Default::default()).await; - test_write_anytime_inner(hummock_storage, meta_client).await; -} - -async fn test_write_anytime_inner( - hummock_storage: impl HummockStateStoreTestTrait, - _meta_client: Arc, -) { - let initial_epoch = hummock_storage.get_pinned_version().max_committed_epoch(); - - let epoch1 = initial_epoch + 1; - - let assert_old_value = |epoch| { - let hummock_storage = &hummock_storage; - async move { - // check point get - assert_eq!( - "111".as_bytes(), - hummock_storage - .get( - gen_key_from_str(VirtualNode::ZERO, "aa"), - epoch, - ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), - ..Default::default() - } - ) - .await - .unwrap() - .unwrap() - ); - assert_eq!( - "222".as_bytes(), - hummock_storage - .get( - gen_key_from_str(VirtualNode::ZERO, "bb"), - epoch, - ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), - ..Default::default() - } - ) - .await - .unwrap() - .unwrap() - ); - assert_eq!( - "333".as_bytes(), - hummock_storage - .get( - gen_key_from_str(VirtualNode::ZERO, "cc"), - epoch, - ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), - ..Default::default() - } - ) - .await - .unwrap() - .unwrap() - ); - // check iter - let iter = hummock_storage - .iter( - ( - Bound::Included(gen_key_from_str(VirtualNode::ZERO, "aa")), - Bound::Included(gen_key_from_str(VirtualNode::ZERO, "cc")), - ), - epoch, - ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), - ..Default::default() - }, - ) - .await - .unwrap(); - futures::pin_mut!(iter); - assert_eq!( - ( - FullKey::new( - TableId::default(), - gen_key_from_str(VirtualNode::ZERO, "aa"), - epoch - ), - Bytes::from("111") - ), - iter.try_next().await.unwrap().unwrap() - ); - assert_eq!( - ( - FullKey::new( - TableId::default(), - gen_key_from_str(VirtualNode::ZERO, "bb"), - epoch - ), - Bytes::from("222") - ), - iter.try_next().await.unwrap().unwrap() - ); - assert_eq!( - ( - FullKey::new( - TableId::default(), - gen_key_from_str(VirtualNode::ZERO, "cc"), - epoch - ), - Bytes::from("333") - ), - iter.try_next().await.unwrap().unwrap() - ); - assert!(iter.try_next().await.unwrap().is_none()); - } - }; - - let batch1 = vec![ - ( - gen_key_from_str(VirtualNode::ZERO, "aa"), - StorageValue::new_put("111"), - ), - ( - gen_key_from_str(VirtualNode::ZERO, "bb"), - StorageValue::new_put("222"), - ), - ( - gen_key_from_str(VirtualNode::ZERO, "cc"), - StorageValue::new_put("333"), - ), - ]; - - let mut local = hummock_storage.new_local(NewLocalOptions::default()).await; - local.init_for_test(epoch1).await.unwrap(); - - local - .ingest_batch( - batch1.clone(), - vec![], - WriteOptions { - epoch: epoch1, - table_id: Default::default(), - }, - ) - .await - .unwrap(); - assert_old_value(epoch1).await; - - let assert_new_value = |epoch| { - let hummock_storage = &hummock_storage; - async move { - // check point get - assert_eq!( - "111_new".as_bytes(), - hummock_storage - .get( - gen_key_from_str(VirtualNode::ZERO, "aa"), - epoch, - ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), - ..Default::default() - } - ) - .await - .unwrap() - .unwrap() - ); - - assert!(hummock_storage - .get( - gen_key_from_str(VirtualNode::ZERO, "bb"), - epoch, - ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), - ..Default::default() - } - ) - .await - .unwrap() - .is_none()); - assert_eq!( - "333".as_bytes(), - hummock_storage - .get( - gen_key_from_str(VirtualNode::ZERO, "cc"), - epoch, - ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), - ..Default::default() - } - ) - .await - .unwrap() - .unwrap() - ); - let iter = hummock_storage - .iter( - ( - Bound::Included(gen_key_from_str(VirtualNode::ZERO, "aa")), - Bound::Included(gen_key_from_str(VirtualNode::ZERO, "cc")), - ), - epoch, - ReadOptions { - cache_policy: CachePolicy::Fill(CachePriority::High), - ..Default::default() - }, - ) - .await - .unwrap(); - futures::pin_mut!(iter); - assert_eq!( - ( - FullKey::new( - TableId::default(), - gen_key_from_str(VirtualNode::ZERO, "aa"), - epoch - ), - Bytes::from("111_new") - ), - iter.try_next().await.unwrap().unwrap() - ); - assert_eq!( - ( - FullKey::new( - TableId::default(), - gen_key_from_str(VirtualNode::ZERO, "cc"), - epoch - ), - Bytes::from("333") - ), - iter.try_next().await.unwrap().unwrap() - ); - assert!(iter.try_next().await.unwrap().is_none()); - } - }; - - // Update aa, delete bb, cc unchanged - let batch2 = vec![ - ( - gen_key_from_str(VirtualNode::ZERO, "aa"), - StorageValue::new_put("111_new"), - ), - ( - gen_key_from_str(VirtualNode::ZERO, "bb"), - StorageValue::new_delete(), - ), - ]; - - local - .ingest_batch( - batch2, - vec![], - WriteOptions { - epoch: epoch1, - table_id: Default::default(), - }, - ) - .await - .unwrap(); - - assert_new_value(epoch1).await; - - let epoch2 = epoch1 + 1; - local.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); - - // Write to epoch2 - local - .ingest_batch( - batch1, - vec![], - WriteOptions { - epoch: epoch2, - table_id: Default::default(), - }, - ) - .await - .unwrap(); - local.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()); - // Assert epoch 1 unchanged - assert_new_value(epoch1).await; - // Assert epoch 2 correctness - assert_old_value(epoch2).await; - - let ssts1 = hummock_storage - .seal_and_sync_epoch(epoch1) - .await - .unwrap() - .uncommitted_ssts; - assert_new_value(epoch1).await; - assert_old_value(epoch2).await; - - let ssts2 = hummock_storage - .seal_and_sync_epoch(epoch2) - .await - .unwrap() - .uncommitted_ssts; - assert_new_value(epoch1).await; - assert_old_value(epoch2).await; - - assert!(!ssts1.is_empty()); - assert!(!ssts2.is_empty()); -} +// Keep this test case's codes for future reference +// #[tokio::test] +// async fn test_write_anytime_v2() { +// let (hummock_storage, meta_client) = with_hummock_storage_v2(Default::default()).await; +// test_write_anytime_inner(hummock_storage, meta_client).await; +// } + +// async fn test_write_anytime_inner( +// hummock_storage: impl HummockStateStoreTestTrait, +// _meta_client: Arc, +// ) { +// let initial_epoch = hummock_storage.get_pinned_version().max_committed_epoch(); + +// let epoch1 = initial_epoch + 1; + +// let assert_old_value = |epoch| { +// let hummock_storage = &hummock_storage; +// async move { +// // check point get +// assert_eq!( +// "111".as_bytes(), +// hummock_storage +// .get( +// gen_key_from_str(VirtualNode::ZERO, "aa"), +// epoch, +// ReadOptions { +// cache_policy: CachePolicy::Fill(CachePriority::High), +// ..Default::default() +// } +// ) +// .await +// .unwrap() +// .unwrap() +// ); +// assert_eq!( +// "222".as_bytes(), +// hummock_storage +// .get( +// gen_key_from_str(VirtualNode::ZERO, "bb"), +// epoch, +// ReadOptions { +// cache_policy: CachePolicy::Fill(CachePriority::High), +// ..Default::default() +// } +// ) +// .await +// .unwrap() +// .unwrap() +// ); +// assert_eq!( +// "333".as_bytes(), +// hummock_storage +// .get( +// gen_key_from_str(VirtualNode::ZERO, "cc"), +// epoch, +// ReadOptions { +// cache_policy: CachePolicy::Fill(CachePriority::High), +// ..Default::default() +// } +// ) +// .await +// .unwrap() +// .unwrap() +// ); +// // check iter +// let iter = hummock_storage +// .iter( +// ( +// Bound::Included(gen_key_from_str(VirtualNode::ZERO, "aa")), +// Bound::Included(gen_key_from_str(VirtualNode::ZERO, "cc")), +// ), +// epoch, +// ReadOptions { +// cache_policy: CachePolicy::Fill(CachePriority::High), +// ..Default::default() +// }, +// ) +// .await +// .unwrap(); +// futures::pin_mut!(iter); +// assert_eq!( +// ( +// FullKey::new( +// TableId::default(), +// gen_key_from_str(VirtualNode::ZERO, "aa"), +// epoch +// ), +// Bytes::from("111") +// ), +// iter.try_next().await.unwrap().unwrap() +// ); +// assert_eq!( +// ( +// FullKey::new( +// TableId::default(), +// gen_key_from_str(VirtualNode::ZERO, "bb"), +// epoch +// ), +// Bytes::from("222") +// ), +// iter.try_next().await.unwrap().unwrap() +// ); +// assert_eq!( +// ( +// FullKey::new( +// TableId::default(), +// gen_key_from_str(VirtualNode::ZERO, "cc"), +// epoch +// ), +// Bytes::from("333") +// ), +// iter.try_next().await.unwrap().unwrap() +// ); +// assert!(iter.try_next().await.unwrap().is_none()); +// } +// }; + +// let batch1 = vec![ +// ( +// gen_key_from_str(VirtualNode::ZERO, "aa"), +// StorageValue::new_put("111"), +// ), +// ( +// gen_key_from_str(VirtualNode::ZERO, "bb"), +// StorageValue::new_put("222"), +// ), +// ( +// gen_key_from_str(VirtualNode::ZERO, "cc"), +// StorageValue::new_put("333"), +// ), +// ]; + +// let mut local = hummock_storage.new_local(NewLocalOptions::default()).await; +// local.init_for_test(epoch1).await.unwrap(); + +// local +// .ingest_batch( +// batch1.clone(), +// vec![], +// WriteOptions { +// epoch: epoch1, +// table_id: Default::default(), +// }, +// ) +// .await +// .unwrap(); +// assert_old_value(epoch1).await; + +// let assert_new_value = |epoch| { +// let hummock_storage = &hummock_storage; +// async move { +// // check point get +// assert_eq!( +// "111_new".as_bytes(), +// hummock_storage +// .get( +// gen_key_from_str(VirtualNode::ZERO, "aa"), +// epoch, +// ReadOptions { +// cache_policy: CachePolicy::Fill(CachePriority::High), +// ..Default::default() +// } +// ) +// .await +// .unwrap() +// .unwrap() +// ); + +// assert!(hummock_storage +// .get( +// gen_key_from_str(VirtualNode::ZERO, "bb"), +// epoch, +// ReadOptions { +// cache_policy: CachePolicy::Fill(CachePriority::High), +// ..Default::default() +// } +// ) +// .await +// .unwrap() +// .is_none()); +// assert_eq!( +// "333".as_bytes(), +// hummock_storage +// .get( +// gen_key_from_str(VirtualNode::ZERO, "cc"), +// epoch, +// ReadOptions { +// cache_policy: CachePolicy::Fill(CachePriority::High), +// ..Default::default() +// } +// ) +// .await +// .unwrap() +// .unwrap() +// ); +// let iter = hummock_storage +// .iter( +// ( +// Bound::Included(gen_key_from_str(VirtualNode::ZERO, "aa")), +// Bound::Included(gen_key_from_str(VirtualNode::ZERO, "cc")), +// ), +// epoch, +// ReadOptions { +// cache_policy: CachePolicy::Fill(CachePriority::High), +// ..Default::default() +// }, +// ) +// .await +// .unwrap(); +// futures::pin_mut!(iter); +// assert_eq!( +// ( +// FullKey::new( +// TableId::default(), +// gen_key_from_str(VirtualNode::ZERO, "aa"), +// epoch +// ), +// Bytes::from("111_new") +// ), +// iter.try_next().await.unwrap().unwrap() +// ); +// assert_eq!( +// ( +// FullKey::new( +// TableId::default(), +// gen_key_from_str(VirtualNode::ZERO, "cc"), +// epoch +// ), +// Bytes::from("333") +// ), +// iter.try_next().await.unwrap().unwrap() +// ); +// assert!(iter.try_next().await.unwrap().is_none()); +// } +// }; + +// // Update aa, delete bb, cc unchanged +// let batch2 = vec![ +// ( +// gen_key_from_str(VirtualNode::ZERO, "aa"), +// StorageValue::new_put("111_new"), +// ), +// ( +// gen_key_from_str(VirtualNode::ZERO, "bb"), +// StorageValue::new_delete(), +// ), +// ]; + +// local +// .ingest_batch( +// batch2, +// vec![], +// WriteOptions { +// epoch: epoch1, +// table_id: Default::default(), +// }, +// ) +// .await +// .unwrap(); + +// assert_new_value(epoch1).await; + +// let epoch2 = epoch1 + 1; +// local.seal_current_epoch(epoch2, SealCurrentEpochOptions::for_test()); + +// // Write to epoch2 +// local +// .ingest_batch( +// batch1, +// vec![], +// WriteOptions { +// epoch: epoch2, +// table_id: Default::default(), +// }, +// ) +// .await +// .unwrap(); +// local.seal_current_epoch(u64::MAX, SealCurrentEpochOptions::for_test()); +// // Assert epoch 1 unchanged +// assert_new_value(epoch1).await; +// // Assert epoch 2 correctness +// assert_old_value(epoch2).await; + +// let ssts1 = hummock_storage +// .seal_and_sync_epoch(epoch1) +// .await +// .unwrap() +// .uncommitted_ssts; +// assert_new_value(epoch1).await; +// assert_old_value(epoch2).await; + +// let ssts2 = hummock_storage +// .seal_and_sync_epoch(epoch2) +// .await +// .unwrap() +// .uncommitted_ssts; +// assert_new_value(epoch1).await; +// assert_old_value(epoch2).await; + +// assert!(!ssts1.is_empty()); +// assert!(!ssts2.is_empty()); +// } #[tokio::test] async fn test_delete_get_v2() { diff --git a/src/storage/src/hummock/compactor/compaction_utils.rs b/src/storage/src/hummock/compactor/compaction_utils.rs index c38ed4b65a82a..3c141b502cc02 100644 --- a/src/storage/src/hummock/compactor/compaction_utils.rs +++ b/src/storage/src/hummock/compactor/compaction_utils.rs @@ -37,7 +37,7 @@ use crate::hummock::compactor::{ TtlCompactionFilter, }; use crate::hummock::iterator::{ - ForwardMergeRangeIterator, SkipWatermarkIterator, UnorderedMergeIteratorInner, UserIterator, + ForwardMergeRangeIterator, MergeIterator, SkipWatermarkIterator, UserIterator, }; use crate::hummock::multi_builder::TableBuilderFactory; use crate::hummock::sstable::DEFAULT_ENTRY_SIZE; @@ -349,7 +349,7 @@ pub async fn check_compaction_result( } } } - let iter = UnorderedMergeIteratorInner::for_compactor(table_iters); + let iter = MergeIterator::for_compactor(table_iters); let mut left_iter = UserIterator::new( SkipWatermarkIterator::from_safe_epoch_watermarks(iter, &compact_task.table_watermarks), (Bound::Unbounded, Bound::Unbounded), diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index 49dd1b6b23abb..94c80bce63777 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -45,8 +45,7 @@ use crate::hummock::compactor::{ fast_compactor_runner, CompactOutput, CompactionFilter, Compactor, CompactorContext, }; use crate::hummock::iterator::{ - Forward, ForwardMergeRangeIterator, HummockIterator, SkipWatermarkIterator, - UnorderedMergeIteratorInner, + Forward, ForwardMergeRangeIterator, HummockIterator, MergeIterator, SkipWatermarkIterator, }; use crate::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory}; use crate::hummock::value::HummockValue; @@ -233,7 +232,7 @@ impl CompactorRunner { Ok(( SkipWatermarkIterator::from_safe_epoch_watermarks( MonitoredCompactorIterator::new( - UnorderedMergeIteratorInner::for_compactor(table_iters), + MergeIterator::for_compactor(table_iters), task_progress.clone(), ), &self.compact_task.table_watermarks, diff --git a/src/storage/src/hummock/compactor/shared_buffer_compact.rs b/src/storage/src/hummock/compactor/shared_buffer_compact.rs index 63ae6ffa301de..1121add4841e1 100644 --- a/src/storage/src/hummock/compactor/shared_buffer_compact.rs +++ b/src/storage/src/hummock/compactor/shared_buffer_compact.rs @@ -37,7 +37,7 @@ use crate::hummock::compactor::{CompactOutput, Compactor}; use crate::hummock::event_handler::uploader::UploadTaskPayload; use crate::hummock::event_handler::LocalInstanceId; use crate::hummock::iterator::{ - Forward, ForwardMergeRangeIterator, HummockIterator, OrderedMergeIteratorInner, + Forward, ForwardMergeRangeIterator, HummockIterator, MergeIterator, }; use crate::hummock::shared_buffer::shared_buffer_batch::{ SharedBufferBatch, SharedBufferBatchInner, SharedBufferVersionedEntry, @@ -258,7 +258,7 @@ async fn compact_shared_buffer( let handle = compaction_executor.spawn(async move { compactor .run( - OrderedMergeIteratorInner::new(forward_iters), + MergeIterator::new(forward_iters), multi_filter_key_extractor, CompactionDeleteRangeIterator::new(del_iter), ) @@ -373,7 +373,7 @@ pub async fn merge_imms_in_memory( epochs.sort(); // use merge iterator to merge input imms - let mut mi = OrderedMergeIteratorInner::new(imm_iters); + let mut mi = MergeIterator::new(imm_iters); mi.rewind().await?; let mut items = Vec::with_capacity(kv_count); while mi.is_valid() { diff --git a/src/storage/src/hummock/iterator/backward_merge.rs b/src/storage/src/hummock/iterator/backward_merge.rs index cdcad9d53dc7d..f8464a777e078 100644 --- a/src/storage/src/hummock/iterator/backward_merge.rs +++ b/src/storage/src/hummock/iterator/backward_merge.rs @@ -18,7 +18,7 @@ mod test { default_builder_opt_for_test, gen_iterator_test_sstable_base, iterator_test_key_of, iterator_test_value_of, mock_sstable_store, TEST_KEYS_COUNT, }; - use crate::hummock::iterator::{HummockIterator, UnorderedMergeIteratorInner}; + use crate::hummock::iterator::{HummockIterator, MergeIterator}; use crate::hummock::BackwardSstableIterator; #[tokio::test] @@ -54,7 +54,7 @@ mod test { BackwardSstableIterator::new(table2, sstable_store), ]; - let mut mi = UnorderedMergeIteratorInner::new(iters); + let mut mi = MergeIterator::new(iters); let mut i = 3 * TEST_KEYS_COUNT; mi.rewind().await.unwrap(); while mi.is_valid() { @@ -107,7 +107,7 @@ mod test { BackwardSstableIterator::new(table2, sstable_store), ]; - let mut mi = UnorderedMergeIteratorInner::new(iters); + let mut mi = MergeIterator::new(iters); // right edge case mi.seek(iterator_test_key_of(0).to_ref()).await.unwrap(); @@ -173,7 +173,7 @@ mod test { BackwardSstableIterator::new(table0, sstable_store), ]; - let mut mi = UnorderedMergeIteratorInner::new(iters); + let mut mi = MergeIterator::new(iters); mi.rewind().await.unwrap(); let mut count = 0; diff --git a/src/storage/src/hummock/iterator/backward_user.rs b/src/storage/src/hummock/iterator/backward_user.rs index f6e41ab876cc7..1b74cfcbe383e 100644 --- a/src/storage/src/hummock/iterator/backward_user.rs +++ b/src/storage/src/hummock/iterator/backward_user.rs @@ -311,7 +311,7 @@ mod tests { iterator_test_bytes_user_key_of, iterator_test_user_key_of, iterator_test_value_of, mock_sstable_store, TEST_KEYS_COUNT, }; - use crate::hummock::iterator::UnorderedMergeIteratorInner; + use crate::hummock::iterator::MergeIterator; use crate::hummock::test_utils::gen_test_sstable; use crate::hummock::value::HummockValue; use crate::hummock::{BackwardSstableIterator, SstableStoreRef, TableHolder}; @@ -350,7 +350,7 @@ mod tests { BackwardSstableIterator::new(table0, sstable_store), ]; - let mi = UnorderedMergeIteratorInner::new(backward_iters); + let mi = MergeIterator::new(backward_iters); let mut ui = BackwardUserIterator::for_test(mi, (Unbounded, Unbounded)); let mut i = 3 * TEST_KEYS_COUNT; ui.rewind().await.unwrap(); @@ -401,7 +401,7 @@ mod tests { BackwardSstableIterator::new(table2, sstable_store), ]; - let bmi = UnorderedMergeIteratorInner::new(backward_iters); + let bmi = MergeIterator::new(backward_iters); let mut bui = BackwardUserIterator::for_test(bmi, (Unbounded, Unbounded)); // right edge case @@ -460,7 +460,7 @@ mod tests { BackwardSstableIterator::new(table0, sstable_store.clone()), BackwardSstableIterator::new(table1, sstable_store), ]; - let bmi = UnorderedMergeIteratorInner::new(backward_iters); + let bmi = MergeIterator::new(backward_iters); let mut bui = BackwardUserIterator::for_test(bmi, (Unbounded, Unbounded)); bui.rewind().await.unwrap(); @@ -503,7 +503,7 @@ mod tests { let sstable = gen_iterator_test_sstable_from_kv_pair(0, kv_pairs, sstable_store.clone()).await; let backward_iters = vec![BackwardSstableIterator::new(sstable, sstable_store)]; - let bmi = UnorderedMergeIteratorInner::new(backward_iters); + let bmi = MergeIterator::new(backward_iters); let begin_key = Included(iterator_test_bytes_user_key_of(2)); let end_key = Included(iterator_test_bytes_user_key_of(7)); @@ -580,7 +580,7 @@ mod tests { let sstable = gen_iterator_test_sstable_from_kv_pair(0, kv_pairs, sstable_store.clone()).await; let backward_iters = vec![BackwardSstableIterator::new(sstable, sstable_store)]; - let bmi = UnorderedMergeIteratorInner::new(backward_iters); + let bmi = MergeIterator::new(backward_iters); let begin_key = Excluded(iterator_test_bytes_user_key_of(2)); let end_key = Included(iterator_test_bytes_user_key_of(7)); @@ -658,7 +658,7 @@ mod tests { let sstable = gen_iterator_test_sstable_from_kv_pair(0, kv_pairs, sstable_store.clone()).await; let backward_iters = vec![BackwardSstableIterator::new(sstable, sstable_store)]; - let bmi = UnorderedMergeIteratorInner::new(backward_iters); + let bmi = MergeIterator::new(backward_iters); let end_key = Included(iterator_test_bytes_user_key_of(7)); let mut bui = BackwardUserIterator::for_test(bmi, (Unbounded, end_key)); @@ -734,7 +734,7 @@ mod tests { let handle = gen_iterator_test_sstable_from_kv_pair(0, kv_pairs, sstable_store.clone()).await; let backward_iters = vec![BackwardSstableIterator::new(handle, sstable_store)]; - let bmi = UnorderedMergeIteratorInner::new(backward_iters); + let bmi = MergeIterator::new(backward_iters); let begin_key = Included(iterator_test_bytes_user_key_of(2)); let mut bui = BackwardUserIterator::for_test(bmi, (begin_key, Unbounded)); @@ -830,7 +830,7 @@ mod tests { }; let backward_iters = vec![BackwardSstableIterator::new(handle, sstable_store)]; - let bmi = UnorderedMergeIteratorInner::new(backward_iters); + let bmi = MergeIterator::new(backward_iters); let mut bui = BackwardUserIterator::for_test(bmi, (start_bound, end_bound)); let num_puts: usize = truth .iter() @@ -1071,7 +1071,7 @@ mod tests { let backward_iters = vec![BackwardSstableIterator::new(table0, sstable_store)]; let min_epoch = (TEST_KEYS_COUNT / 5) as u64; - let mi = UnorderedMergeIteratorInner::new(backward_iters); + let mi = MergeIterator::new(backward_iters); let mut ui = BackwardUserIterator::with_min_epoch(mi, (Unbounded, Unbounded), min_epoch); ui.rewind().await.unwrap(); diff --git a/src/storage/src/hummock/iterator/forward_merge.rs b/src/storage/src/hummock/iterator/forward_merge.rs index 82c43afd0c11c..8219a7eb52823 100644 --- a/src/storage/src/hummock/iterator/forward_merge.rs +++ b/src/storage/src/hummock/iterator/forward_merge.rs @@ -28,117 +28,84 @@ mod test { gen_merge_iterator_interleave_test_sstable_iters, iterator_test_key_of, iterator_test_value_of, mock_sstable_store, TEST_KEYS_COUNT, }; - use crate::hummock::iterator::{ - Forward, HummockIterator, HummockIteratorUnion, OrderedMergeIteratorInner, - UnorderedMergeIteratorInner, - }; + use crate::hummock::iterator::{Forward, HummockIterator, MergeIterator}; use crate::hummock::sstable::{ SstableIterator, SstableIteratorReadOptions, SstableIteratorType, }; - use crate::hummock::test_utils::gen_test_sstable; use crate::hummock::value::HummockValue; use crate::hummock::HummockResult; use crate::monitor::StoreLocalStatistic; #[tokio::test] async fn test_merge_basic() { - let mut unordered_iter: HummockIteratorUnion< - Forward, - UnorderedMergeIteratorInner, - OrderedMergeIteratorInner, - > = HummockIteratorUnion::First(UnorderedMergeIteratorInner::new( - gen_merge_iterator_interleave_test_sstable_iters(TEST_KEYS_COUNT, 3).await, - )); - let mut ordered_iter: HummockIteratorUnion< - Forward, - UnorderedMergeIteratorInner, - OrderedMergeIteratorInner, - > = HummockIteratorUnion::Second(OrderedMergeIteratorInner::new( + let mut iter = MergeIterator::new( gen_merge_iterator_interleave_test_sstable_iters(TEST_KEYS_COUNT, 3).await, - )); + ); - // Test both ordered and unordered iterators - let test_iters = vec![&mut unordered_iter, &mut ordered_iter]; - for iter in test_iters { - let mut i = 0; - iter.rewind().await.unwrap(); - while iter.is_valid() { - let key = iter.key(); - let val = iter.value(); - assert_eq!(key, iterator_test_key_of(i).to_ref()); - assert_eq!( - val.into_user_value().unwrap(), - iterator_test_value_of(i).as_slice() - ); - i += 1; - iter.next().await.unwrap(); - if i == TEST_KEYS_COUNT * 3 { - assert!(!iter.is_valid()); - break; - } + // Test merge iterators + let mut i = 0; + iter.rewind().await.unwrap(); + while iter.is_valid() { + let key = iter.key(); + let val = iter.value(); + assert_eq!(key, iterator_test_key_of(i).to_ref()); + assert_eq!( + val.into_user_value().unwrap(), + iterator_test_value_of(i).as_slice() + ); + i += 1; + iter.next().await.unwrap(); + if i == TEST_KEYS_COUNT * 3 { + assert!(!iter.is_valid()); + break; } - assert!(i >= TEST_KEYS_COUNT * 3); } + assert!(i >= TEST_KEYS_COUNT * 3); } #[tokio::test] async fn test_merge_seek() { - let mut unordered_iter: HummockIteratorUnion< - Forward, - UnorderedMergeIteratorInner, - OrderedMergeIteratorInner, - > = HummockIteratorUnion::First(UnorderedMergeIteratorInner::new( - gen_merge_iterator_interleave_test_sstable_iters(TEST_KEYS_COUNT, 3).await, - )); - let mut ordered_iter: HummockIteratorUnion< - Forward, - UnorderedMergeIteratorInner, - OrderedMergeIteratorInner, - > = HummockIteratorUnion::Second(OrderedMergeIteratorInner::new( + let mut iter = MergeIterator::new( gen_merge_iterator_interleave_test_sstable_iters(TEST_KEYS_COUNT, 3).await, - )); - - // Test both ordered and unordered iterators - let test_iters = vec![&mut unordered_iter, &mut ordered_iter]; - - for iter in test_iters { - // right edge case - iter.seek(iterator_test_key_of(TEST_KEYS_COUNT * 3).to_ref()) - .await - .unwrap(); - assert!(!iter.is_valid()); - - // normal case - iter.seek(iterator_test_key_of(TEST_KEYS_COUNT * 2 + 5).to_ref()) - .await - .unwrap(); - let k = iter.key(); - let v = iter.value(); - assert_eq!( - v.into_user_value().unwrap(), - iterator_test_value_of(TEST_KEYS_COUNT * 2 + 5).as_slice() - ); - assert_eq!(k, iterator_test_key_of(TEST_KEYS_COUNT * 2 + 5).to_ref()); - - iter.seek(iterator_test_key_of(17).to_ref()).await.unwrap(); - let k = iter.key(); - let v = iter.value(); - assert_eq!( - v.into_user_value().unwrap(), - iterator_test_value_of(TEST_KEYS_COUNT + 7).as_slice() - ); - assert_eq!(k, iterator_test_key_of(TEST_KEYS_COUNT + 7).to_ref()); - - // left edge case - iter.seek(iterator_test_key_of(0).to_ref()).await.unwrap(); - let k = iter.key(); - let v = iter.value(); - assert_eq!( - v.into_user_value().unwrap(), - iterator_test_value_of(0).as_slice() - ); - assert_eq!(k, iterator_test_key_of(0).to_ref()); - } + ); + + // Test merge iterators + // right edge case + iter.seek(iterator_test_key_of(TEST_KEYS_COUNT * 3).to_ref()) + .await + .unwrap(); + assert!(!iter.is_valid()); + + // normal case + iter.seek(iterator_test_key_of(TEST_KEYS_COUNT * 2 + 5).to_ref()) + .await + .unwrap(); + let k = iter.key(); + let v = iter.value(); + assert_eq!( + v.into_user_value().unwrap(), + iterator_test_value_of(TEST_KEYS_COUNT * 2 + 5).as_slice() + ); + assert_eq!(k, iterator_test_key_of(TEST_KEYS_COUNT * 2 + 5).to_ref()); + + iter.seek(iterator_test_key_of(17).to_ref()).await.unwrap(); + let k = iter.key(); + let v = iter.value(); + assert_eq!( + v.into_user_value().unwrap(), + iterator_test_value_of(TEST_KEYS_COUNT + 7).as_slice() + ); + assert_eq!(k, iterator_test_key_of(TEST_KEYS_COUNT + 7).to_ref()); + + // left edge case + iter.seek(iterator_test_key_of(0).to_ref()).await.unwrap(); + let k = iter.key(); + let v = iter.value(); + assert_eq!( + v.into_user_value().unwrap(), + iterator_test_value_of(0).as_slice() + ); + assert_eq!(k, iterator_test_key_of(0).to_ref()); } #[tokio::test] @@ -163,27 +130,7 @@ mod test { .await; let mut stats = StoreLocalStatistic::default(); - let mut unordered_iter: HummockIteratorUnion< - Forward, - UnorderedMergeIteratorInner, - OrderedMergeIteratorInner, - > = HummockIteratorUnion::First(UnorderedMergeIteratorInner::new(vec![ - SstableIterator::create( - sstable_store.sstable(&table0, &mut stats).await.unwrap(), - sstable_store.clone(), - read_options.clone(), - ), - SstableIterator::create( - sstable_store.sstable(&table1, &mut stats).await.unwrap(), - sstable_store.clone(), - read_options.clone(), - ), - ])); - let mut ordered_iter: HummockIteratorUnion< - Forward, - UnorderedMergeIteratorInner, - OrderedMergeIteratorInner, - > = HummockIteratorUnion::Second(OrderedMergeIteratorInner::new(vec![ + let mut iter = MergeIterator::new(vec![ SstableIterator::create( sstable_store.sstable(&table0, &mut stats).await.unwrap(), sstable_store.clone(), @@ -194,109 +141,23 @@ mod test { sstable_store.clone(), read_options.clone(), ), - ])); - - // Test both ordered and unordered iterators - let test_iters = vec![&mut unordered_iter, &mut ordered_iter]; - - for iter in test_iters { - iter.rewind().await.unwrap(); - let mut count = 0; - while iter.is_valid() { - count += 1; - iter.next().await.unwrap(); - } - assert_eq!(count, TEST_KEYS_COUNT * 2); - - iter.rewind().await.unwrap(); - let mut count = 0; - while iter.is_valid() { - count += 1; - iter.next().await.unwrap(); - } - assert_eq!(count, TEST_KEYS_COUNT * 2); - } - } - - #[tokio::test] - async fn test_ordered_merge_iter() { - let sstable_store = mock_sstable_store(); - let read_options = Arc::new(SstableIteratorReadOptions::default()); - - let non_overlapped_sstable = gen_test_sstable( - default_builder_opt_for_test(), - 0, - (0..TEST_KEYS_COUNT).filter(|x| x % 3 == 0).map(|x| { - ( - iterator_test_key_of(x), - HummockValue::put(format!("non_overlapped_{}", x).as_bytes().to_vec()), - ) - }), - sstable_store.clone(), - ) - .await; - - let overlapped_old_sstable = gen_test_sstable( - default_builder_opt_for_test(), - 1, - (0..TEST_KEYS_COUNT).filter(|x| x % 3 != 0).map(|x| { - ( - iterator_test_key_of(x), - HummockValue::put(format!("overlapped_old_{}", x).as_bytes().to_vec()), - ) - }), - sstable_store.clone(), - ) - .await; - - let overlapped_new_sstable = gen_test_sstable( - default_builder_opt_for_test(), - 2, - (0..TEST_KEYS_COUNT).filter(|x| x % 3 == 1).map(|x| { - ( - iterator_test_key_of(x), - HummockValue::put(format!("overlapped_new_{}", x).as_bytes().to_vec()), - ) - }), - sstable_store.clone(), - ) - .await; - let mut iter = OrderedMergeIteratorInner::new(vec![ - SstableIterator::create( - non_overlapped_sstable, - sstable_store.clone(), - read_options.clone(), - ), - SstableIterator::create( - overlapped_new_sstable, - sstable_store.clone(), - read_options.clone(), - ), - SstableIterator::create( - overlapped_old_sstable, - sstable_store.clone(), - read_options.clone(), - ), ]); iter.rewind().await.unwrap(); - let mut count = 0; - while iter.is_valid() { - assert_eq!(iter.key(), iterator_test_key_of(count).to_ref()); - let expected_value = match count % 3 { - 0 => format!("non_overlapped_{}", count).as_bytes().to_vec(), - 1 => format!("overlapped_new_{}", count).as_bytes().to_vec(), - 2 => format!("overlapped_old_{}", count).as_bytes().to_vec(), - _ => unreachable!(), - }; - assert_eq!(iter.value(), HummockValue::put(expected_value.as_slice())); count += 1; iter.next().await.unwrap(); } + assert_eq!(count, TEST_KEYS_COUNT * 2); - assert_eq!(count, TEST_KEYS_COUNT); + iter.rewind().await.unwrap(); + let mut count = 0; + while iter.is_valid() { + count += 1; + iter.next().await.unwrap(); + } + assert_eq!(count, TEST_KEYS_COUNT * 2); } struct CancellationTestIterator {} @@ -339,9 +200,9 @@ mod test { #[tokio::test] async fn test_merge_iter_cancel() { - let mut merge_iter = UnorderedMergeIteratorInner::new(vec![ - OrderedMergeIteratorInner::new(once(CancellationTestIterator {})), - OrderedMergeIteratorInner::new(once(CancellationTestIterator {})), + let mut merge_iter = MergeIterator::new(vec![ + MergeIterator::new(once(CancellationTestIterator {})), + MergeIterator::new(once(CancellationTestIterator {})), ]); merge_iter.rewind().await.unwrap(); let future = merge_iter.next(); @@ -354,7 +215,7 @@ mod test { .is_pending()); } - // Dropping the future will panic if the OrderedMergeIteratorInner is not cancellation safe. + // Dropping the future will panic if the OrderedMergeIterator is not cancellation safe. // See https://github.com/risingwavelabs/risingwave/issues/6637 } } diff --git a/src/storage/src/hummock/iterator/forward_user.rs b/src/storage/src/hummock/iterator/forward_user.rs index 963392b091ca3..135bf6313bdc6 100644 --- a/src/storage/src/hummock/iterator/forward_user.rs +++ b/src/storage/src/hummock/iterator/forward_user.rs @@ -334,7 +334,7 @@ mod tests { iterator_test_bytes_key_of_epoch, iterator_test_bytes_user_key_of, iterator_test_value_of, mock_sstable_store, TEST_KEYS_COUNT, }; - use crate::hummock::iterator::UnorderedMergeIteratorInner; + use crate::hummock::iterator::MergeIterator; use crate::hummock::sstable::{ SstableIterator, SstableIteratorReadOptions, SstableIteratorType, }; @@ -376,7 +376,7 @@ mod tests { SstableIterator::create(table2, sstable_store, read_options.clone()), ]; - let mi = UnorderedMergeIteratorInner::new(iters); + let mi = MergeIterator::new(iters); let mut ui = UserIterator::for_test(mi, (Unbounded, Unbounded)); ui.rewind().await.unwrap(); @@ -430,7 +430,7 @@ mod tests { SstableIterator::create(table2, sstable_store, read_options), ]; - let mi = UnorderedMergeIteratorInner::new(iters); + let mi = MergeIterator::new(iters); let mut ui = UserIterator::for_test(mi, (Unbounded, Unbounded)); // right edge case @@ -493,7 +493,7 @@ mod tests { SstableIterator::create(table1, sstable_store.clone(), read_options), ]; - let mi = UnorderedMergeIteratorInner::new(iters); + let mi = MergeIterator::new(iters); let mut ui = UserIterator::for_test(mi, (Unbounded, Unbounded)); ui.rewind().await.unwrap(); @@ -549,7 +549,7 @@ mod tests { let table = generate_test_data(sstable_store.clone(), vec![]).await; let read_options = Arc::new(SstableIteratorReadOptions::default()); let iters = vec![SstableIterator::create(table, sstable_store, read_options)]; - let mi = UnorderedMergeIteratorInner::new(iters); + let mi = MergeIterator::new(iters); let begin_key = Included(iterator_test_bytes_user_key_of(2)); let end_key = Included(iterator_test_bytes_user_key_of(7)); @@ -627,7 +627,7 @@ mod tests { gen_iterator_test_sstable_from_kv_pair(0, kv_pairs, sstable_store.clone()).await; let read_options = Arc::new(SstableIteratorReadOptions::default()); let iters = vec![SstableIterator::create(table, sstable_store, read_options)]; - let mi = UnorderedMergeIteratorInner::new(iters); + let mi = MergeIterator::new(iters); let begin_key = Included(iterator_test_bytes_user_key_of(2)); let end_key = Excluded(iterator_test_bytes_user_key_of(7)); @@ -690,7 +690,7 @@ mod tests { let table = generate_test_data(sstable_store.clone(), vec![]).await; let read_options = Arc::new(SstableIteratorReadOptions::default()); let iters = vec![SstableIterator::create(table, sstable_store, read_options)]; - let mi = UnorderedMergeIteratorInner::new(iters); + let mi = MergeIterator::new(iters); let end_key = Included(iterator_test_bytes_user_key_of(7)); let mut ui = UserIterator::for_test(mi, (Unbounded, end_key)); @@ -754,7 +754,7 @@ mod tests { let table = generate_test_data(sstable_store.clone(), vec![]).await; let read_options = Arc::new(SstableIteratorReadOptions::default()); let iters = vec![SstableIterator::create(table, sstable_store, read_options)]; - let mi = UnorderedMergeIteratorInner::new(iters); + let mi = MergeIterator::new(iters); let begin_key = Included(iterator_test_bytes_user_key_of(2)); let mut ui = UserIterator::for_test(mi, (begin_key, Unbounded)); @@ -834,7 +834,7 @@ mod tests { )]; let min_epoch = (TEST_KEYS_COUNT / 5) as u64; - let mi = UnorderedMergeIteratorInner::new(iters); + let mi = MergeIterator::new(iters); let mut ui = UserIterator::for_test_with_epoch(mi, (Unbounded, Unbounded), u64::MAX, min_epoch); ui.rewind().await.unwrap(); @@ -868,7 +868,7 @@ mod tests { sstable_store.clone(), Arc::new(read_options), )]; - let mi = UnorderedMergeIteratorInner::new(iters); + let mi = MergeIterator::new(iters); let mut del_iter = ForwardMergeRangeIterator::new(150); del_iter.add_sst_iter(SstableDeleteRangeIterator::new(table.clone())); @@ -900,7 +900,7 @@ mod tests { )]; let mut del_iter = ForwardMergeRangeIterator::new(300); del_iter.add_sst_iter(SstableDeleteRangeIterator::new(table.clone())); - let mi = UnorderedMergeIteratorInner::new(iters); + let mi = MergeIterator::new(iters); let mut ui: UserIterator<_> = UserIterator::new(mi, (Unbounded, Unbounded), 300, 0, None, del_iter); ui.rewind().await.unwrap(); diff --git a/src/storage/src/hummock/iterator/merge_inner.rs b/src/storage/src/hummock/iterator/merge_inner.rs index c9639d9babad1..1dfd926b6f53a 100644 --- a/src/storage/src/hummock/iterator/merge_inner.rs +++ b/src/storage/src/hummock/iterator/merge_inner.rs @@ -14,35 +14,25 @@ use std::collections::binary_heap::PeekMut; use std::collections::{BinaryHeap, LinkedList}; -use std::future::Future; use std::ops::{Deref, DerefMut}; use bytes::Bytes; -use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey}; +use risingwave_hummock_sdk::key::{FullKey, TableKey}; use risingwave_hummock_sdk::EpochWithGap; -use crate::hummock::iterator::{DirectionEnum, Forward, HummockIterator, HummockIteratorDirection}; +use super::Forward; +use crate::hummock::iterator::{DirectionEnum, HummockIterator, HummockIteratorDirection}; use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferBatchIterator; use crate::hummock::value::HummockValue; use crate::hummock::HummockResult; use crate::monitor::StoreLocalStatistic; -pub trait NodeExtraOrderInfo: Eq + Ord + Send + Sync {} - -/// For unordered merge iterator, no extra order info is needed. -type UnorderedNodeExtra = (); -/// Store the order index for the order aware merge iterator -type OrderedNodeExtra = usize; -impl NodeExtraOrderInfo for UnorderedNodeExtra {} -impl NodeExtraOrderInfo for OrderedNodeExtra {} - -pub struct Node { +pub struct Node { iter: I, - extra_order_info: T, } -impl Eq for Node where Self: PartialEq {} -impl PartialOrd for Node +impl Eq for Node where Self: PartialEq {} +impl PartialOrd for Node where Self: Ord, { @@ -52,7 +42,7 @@ where } /// Implement `Ord` for unordered iter node. Only compare the key. -impl Ord for Node { +impl Ord for Node { fn cmp(&self, other: &Self) -> std::cmp::Ordering { // Note: to implement min-heap by using max-heap internally, the comparing // order should be reversed. @@ -64,53 +54,33 @@ impl Ord for Node { } } -/// Implement `Ord` for ordered iter node. Compare key and use order index as tie breaker. -impl Ord for Node { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - // The `extra_info` is used as a tie-breaker when the keys are equal. - match I::Direction::direction() { - DirectionEnum::Forward => other - .iter - .key() - .cmp(&self.iter.key()) - .then_with(|| other.extra_order_info.cmp(&self.extra_order_info)), - DirectionEnum::Backward => self - .iter - .key() - .cmp(&other.iter.key()) - .then_with(|| self.extra_order_info.cmp(&other.extra_order_info)), - } - } -} - -impl PartialEq for Node { +impl PartialEq for Node { fn eq(&self, other: &Self) -> bool { self.iter.key() == other.iter.key() } } -impl PartialEq for Node { - fn eq(&self, other: &Self) -> bool { - self.iter.key() == other.iter.key() && self.extra_order_info.eq(&other.extra_order_info) - } -} - /// Iterates on multiple iterators, a.k.a. `MergeIterator`. -pub struct MergeIteratorInner { +pub struct MergeIterator { /// Invalid or non-initialized iterators. - unused_iters: LinkedList>, + unused_iters: LinkedList>, /// The heap for merge sort. - heap: BinaryHeap>, - - last_table_key: Vec, + heap: BinaryHeap>, } -/// An order aware merge iterator. -#[allow(type_alias_bounds)] -pub type OrderedMergeIteratorInner = MergeIteratorInner; +impl MergeIterator { + fn collect_local_statistic_impl(&self, stats: &mut StoreLocalStatistic) { + for node in &self.heap { + node.iter.collect_local_statistic(stats); + } + for node in &self.unused_iters { + node.iter.collect_local_statistic(stats); + } + } +} -impl OrderedMergeIteratorInner { +impl MergeIterator { pub fn new(iterators: impl IntoIterator) -> Self { Self::create(iterators) } @@ -121,21 +91,13 @@ impl OrderedMergeIteratorInner { fn create(iterators: impl IntoIterator) -> Self { Self { - unused_iters: iterators - .into_iter() - .enumerate() - .map(|(i, iter)| Node { - iter, - extra_order_info: i, - }) - .collect(), + unused_iters: iterators.into_iter().map(|iter| Node { iter }).collect(), heap: BinaryHeap::new(), - last_table_key: Vec::new(), } } } -impl OrderedMergeIteratorInner> { +impl MergeIterator> { /// Used in `merge_imms_in_memory` to merge immutable memtables. pub fn current_item(&self) -> (TableKey, (EpochWithGap, HummockValue)) { let item = self @@ -148,48 +110,9 @@ impl OrderedMergeIteratorInner> { } } -impl MergeIteratorInner { - fn collect_local_statistic_impl(&self, stats: &mut StoreLocalStatistic) { - for node in &self.heap { - node.iter.collect_local_statistic(stats); - } - for node in &self.unused_iters { - node.iter.collect_local_statistic(stats); - } - } -} - -#[allow(type_alias_bounds)] -pub type UnorderedMergeIteratorInner = - MergeIteratorInner; - -impl UnorderedMergeIteratorInner { - pub fn new(iterators: impl IntoIterator) -> Self { - Self::create(iterators) - } - - pub fn for_compactor(iterators: impl IntoIterator) -> Self { - Self::create(iterators) - } - - fn create(iterators: impl IntoIterator) -> Self { - Self { - unused_iters: iterators - .into_iter() - .map(|iter| Node { - iter, - extra_order_info: (), - }) - .collect(), - heap: BinaryHeap::new(), - last_table_key: Vec::new(), - } - } -} - -impl MergeIteratorInner +impl MergeIterator where - Node: Ord, + Node: Ord, { /// Moves all iterators from the `heap` to the linked list. fn reset_heap(&mut self) { @@ -208,12 +131,6 @@ where } } -/// The behaviour of `next` of order aware merge iterator is different from the normal one, so we -/// extract this trait. -trait MergeIteratorNext { - fn next_inner(&mut self) -> impl Future> + Send + '_; -} - /// This is a wrapper for the `PeekMut` of heap. /// /// Several panics due to future cancellation are caused by calling `drop` on the `PeekMut` when @@ -282,54 +199,13 @@ impl<'a, T: Ord> Drop for PeekMutGuard<'a, T> { } } -impl MergeIteratorNext for OrderedMergeIteratorInner { - async fn next_inner(&mut self) -> HummockResult<()> { - let top_key = { - let top_key = self.heap.peek().expect("no inner iter").iter.key(); - self.last_table_key.clear(); - self.last_table_key - .extend_from_slice(top_key.user_key.table_key.0); - FullKey { - user_key: UserKey { - table_id: top_key.user_key.table_id, - table_key: TableKey(self.last_table_key.as_slice()), - }, - epoch_with_gap: top_key.epoch_with_gap, - } - }; - loop { - let Some(mut node) = PeekMutGuard::peek_mut(&mut self.heap, &mut self.unused_iters) - else { - break; - }; - // WARNING: within scope of BinaryHeap::PeekMut, we must carefully handle all places - // of return. Once the iterator enters an invalid state, we should - // remove it from heap before returning. - - if node.iter.key() == top_key { - if let Err(e) = node.iter.next().await { - node.pop(); - self.heap.clear(); - return Err(e); - }; - if !node.iter.is_valid() { - let node = node.pop(); - self.unused_iters.push_back(node); - } else { - node.used(); - } - } else { - node.used(); - break; - } - } - - Ok(()) - } -} +impl HummockIterator for MergeIterator +where + Node: Ord, +{ + type Direction = I::Direction; -impl MergeIteratorNext for UnorderedMergeIteratorInner { - async fn next_inner(&mut self) -> HummockResult<()> { + async fn next(&mut self) -> HummockResult<()> { let mut node = PeekMutGuard::peek_mut(&mut self.heap, &mut self.unused_iters).expect("no inner iter"); @@ -359,18 +235,6 @@ impl MergeIteratorNext for UnorderedMergeIteratorInner { Ok(()) } -} - -impl HummockIterator for MergeIteratorInner -where - Self: MergeIteratorNext, - Node: Ord, -{ - type Direction = I::Direction; - - async fn next(&mut self) -> HummockResult<()> { - self.next_inner().await - } fn key(&self) -> FullKey<&[u8]> { self.heap.peek().expect("no inner iter").iter.key() diff --git a/src/storage/src/hummock/iterator/mod.rs b/src/storage/src/hummock/iterator/mod.rs index 1c3a493d8f46f..7392709710996 100644 --- a/src/storage/src/hummock/iterator/mod.rs +++ b/src/storage/src/hummock/iterator/mod.rs @@ -35,7 +35,7 @@ mod forward_merge; pub mod forward_user; mod merge_inner; pub use forward_user::*; -pub use merge_inner::{OrderedMergeIteratorInner, UnorderedMergeIteratorInner}; +pub use merge_inner::MergeIterator; use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey}; use crate::hummock::iterator::HummockIteratorUnion::{First, Fourth, Second, Third}; diff --git a/src/storage/src/hummock/store/local_hummock_storage.rs b/src/storage/src/hummock/store/local_hummock_storage.rs index a9a1899d8e552..4de14e3e0de19 100644 --- a/src/storage/src/hummock/store/local_hummock_storage.rs +++ b/src/storage/src/hummock/store/local_hummock_storage.rs @@ -30,8 +30,8 @@ use super::version::{HummockReadVersion, StagingData, VersionUpdate}; use crate::error::StorageResult; use crate::hummock::event_handler::{HummockEvent, LocalInstanceGuard}; use crate::hummock::iterator::{ - ConcatIteratorInner, Forward, HummockIteratorUnion, OrderedMergeIteratorInner, - SkipWatermarkIterator, UnorderedMergeIteratorInner, UserIterator, + ConcatIteratorInner, Forward, HummockIteratorUnion, MergeIterator, SkipWatermarkIterator, + UserIterator, }; use crate::hummock::shared_buffer::shared_buffer_batch::{ SharedBufferBatch, SharedBufferBatchIterator, @@ -568,11 +568,11 @@ impl LocalHummockStorage { } } -pub type StagingDataIterator = OrderedMergeIteratorInner< +pub type StagingDataIterator = MergeIterator< HummockIteratorUnion, SstableIterator>, >; pub type HummockStorageIteratorPayloadInner<'a> = SkipWatermarkIterator< - UnorderedMergeIteratorInner< + MergeIterator< HummockIteratorUnion< Forward, StagingDataIterator, diff --git a/src/storage/src/hummock/store/version.rs b/src/storage/src/hummock/store/version.rs index 9aaa39e2c6866..6719626967662 100644 --- a/src/storage/src/hummock/store/version.rs +++ b/src/storage/src/hummock/store/version.rs @@ -39,8 +39,8 @@ use tracing::Instrument; use super::StagingDataIterator; use crate::error::StorageResult; use crate::hummock::iterator::{ - ConcatIterator, ForwardMergeRangeIterator, HummockIteratorUnion, OrderedMergeIteratorInner, - SkipWatermarkIterator, UnorderedMergeIteratorInner, UserIterator, + ConcatIterator, ForwardMergeRangeIterator, HummockIteratorUnion, MergeIterator, + SkipWatermarkIterator, UserIterator, }; use crate::hummock::local_version::pinned_version::PinnedVersion; use crate::hummock::sstable::SstableIteratorReadOptions; @@ -904,7 +904,7 @@ impl HummockVersionReader { ))); } local_stats.staging_sst_iter_count = staging_sst_iter_count; - let staging_iter: StagingDataIterator = OrderedMergeIteratorInner::new(staging_iters); + let staging_iter: StagingDataIterator = MergeIterator::new(staging_iters); let mut non_overlapping_iters = Vec::new(); let mut overlapping_iters = Vec::new(); @@ -1038,7 +1038,7 @@ impl HummockVersionReader { } // 3. build user_iterator - let merge_iter = UnorderedMergeIteratorInner::new( + let merge_iter = MergeIterator::new( once(HummockIteratorUnion::First(staging_iter)) .chain( overlapping_iters diff --git a/src/storage/src/storage_failpoints/test_iterator.rs b/src/storage/src/storage_failpoints/test_iterator.rs index 9eebdeaa64d6d..5b794da34c862 100644 --- a/src/storage/src/storage_failpoints/test_iterator.rs +++ b/src/storage/src/storage_failpoints/test_iterator.rs @@ -24,8 +24,8 @@ use crate::hummock::iterator::test_utils::{ TEST_KEYS_COUNT, }; use crate::hummock::iterator::{ - BackwardConcatIterator, BackwardUserIterator, ConcatIterator, HummockIterator, - UnorderedMergeIteratorInner, UserIterator, + BackwardConcatIterator, BackwardUserIterator, ConcatIterator, HummockIterator, MergeIterator, + UserIterator, }; use crate::hummock::sstable::SstableIteratorReadOptions; use crate::hummock::test_utils::{ @@ -175,7 +175,7 @@ async fn test_failpoints_merge_invalid_key() { ) .await; let tables = vec![table0, table1]; - let mut mi = UnorderedMergeIteratorInner::new({ + let mut mi = MergeIterator::new({ let mut iters = vec![]; for table in tables { iters.push(SstableIterator::new( @@ -223,7 +223,7 @@ async fn test_failpoints_backward_merge_invalid_key() { ) .await; let tables = vec![table0, table1]; - let mut mi = UnorderedMergeIteratorInner::new({ + let mut mi = MergeIterator::new({ let mut iters = vec![]; for table in tables { iters.push(BackwardSstableIterator::new(table, sstable_store.clone())); @@ -279,7 +279,7 @@ async fn test_failpoints_user_read_err() { ), ]; - let mi = UnorderedMergeIteratorInner::new(iters); + let mi = MergeIterator::new(iters); let mut ui = UserIterator::for_test(mi, (Unbounded, Unbounded)); ui.rewind().await.unwrap(); @@ -331,7 +331,7 @@ async fn test_failpoints_backward_user_read_err() { BackwardSstableIterator::new(table1, sstable_store.clone()), ]; - let mi = UnorderedMergeIteratorInner::new(iters); + let mi = MergeIterator::new(iters); let mut ui = BackwardUserIterator::for_test(mi, (Unbounded, Unbounded)); ui.rewind().await.unwrap();