diff --git a/src/storage/hummock_test/src/hummock_read_version_tests.rs b/src/storage/hummock_test/src/hummock_read_version_tests.rs index 0da472629751d..664a6a0a6776a 100644 --- a/src/storage/hummock_test/src/hummock_read_version_tests.rs +++ b/src/storage/hummock_test/src/hummock_read_version_tests.rs @@ -71,15 +71,14 @@ async fn test_read_version_basic() { Bound::Included(Bytes::from(key.to_vec())), )); - let (staging_imm_iter, staging_sst_iter) = + let staging_data_iter = read_version .staging() .prune_overlap(epoch, TableId::default(), &key_range); - let staging_imm = staging_imm_iter.cloned().collect_vec(); + let staging_imm = staging_data_iter.cloned().collect_vec(); assert_eq!(1, staging_imm.len()); - assert_eq!(0, staging_sst_iter.count()); assert!(staging_imm.iter().any(|imm| imm.min_epoch() <= epoch)); } @@ -108,15 +107,14 @@ async fn test_read_version_basic() { Bound::Included(Bytes::from(key.to_vec())), Bound::Included(Bytes::from(key.to_vec())), )); - let (staging_imm_iter, staging_sst_iter) = + let staging_data_iter = read_version .staging() .prune_overlap(epoch, TableId::default(), &key_range); - let staging_imm = staging_imm_iter.cloned().collect_vec(); + let staging_imm = staging_data_iter.cloned().collect_vec(); assert_eq!(1, staging_imm.len() as u64); - assert_eq!(0, staging_sst_iter.count()); assert!(staging_imm.iter().any(|imm| imm.min_epoch() <= epoch)); } } @@ -124,23 +122,20 @@ async fn test_read_version_basic() { { // test clean imm with sst update info let staging = read_version.staging(); - assert_eq!(6, staging.imm.len()); + assert_eq!(6, staging.data.len()); let batch_id_vec_for_clear = staging - .imm + .data .iter() - .rev() - .map(|imm| imm.batch_id()) + .flat_map(|data| data.to_imm().map(|imm| imm.batch_id())) .take(3) - .rev() .collect::>(); let epoch_id_vec_for_clear = staging - .imm + .data .iter() - .rev() + .flat_map(|data| data.to_imm()) .map(|imm| imm.min_epoch()) .take(3) - .rev() .collect::>(); let dummy_sst = Arc::new(StagingSstableInfo::new( @@ -195,13 +190,11 @@ async fn test_read_version_basic() { // imm(0, 1, 2) => sst{sst_object_id: 1} // staging => {imm(3, 4, 5), sst[{sst_object_id: 1}, {sst_object_id: 2}]} let staging = read_version.staging(); - assert_eq!(3, read_version.staging().imm.len()); - assert_eq!(1, read_version.staging().sst.len()); - assert_eq!(2, read_version.staging().sst[0].sstable_infos().len()); + assert_eq!(4, read_version.staging().data.len()); let remain_batch_id_vec = staging - .imm + .data .iter() - .map(|imm| imm.batch_id()) + .flat_map(|data| data.to_imm().map(|imm| imm.batch_id())) .collect::>(); assert!(remain_batch_id_vec.iter().any(|batch_id| *batch_id > 2)); } @@ -215,20 +208,25 @@ async fn test_read_version_basic() { Bound::Included(Bytes::from(key_range_right)), )); - let (staging_imm_iter, staging_sst_iter) = + let staging_data_iter = read_version .staging() .prune_overlap(epoch, TableId::default(), &key_range); - let staging_imm = staging_imm_iter.cloned().collect_vec(); - assert_eq!(1, staging_imm.len()); + let staging_data = staging_data_iter.cloned().collect_vec(); + assert_eq!(2, staging_data.len()); - assert_eq!(test_epoch(4), staging_imm[0].min_epoch()); + assert_eq!(test_epoch(4), staging_data[0].min_epoch()); - let staging_ssts = staging_sst_iter.cloned().collect_vec(); - assert_eq!(2, staging_ssts.len()); - assert_eq!(1, staging_ssts[0].get_object_id()); - assert_eq!(2, staging_ssts[1].get_object_id()); + match &staging_data[1] { + StagingData::Sst(sst) => { + assert_eq!(1, sst.sstable_infos()[0].sst_info.get_object_id()); + assert_eq!(2, sst.sstable_infos()[1].sst_info.get_object_id()); + } + StagingData::ImmMem(_) => { + unreachable!("can not be immemtable"); + } + } } { @@ -240,18 +238,18 @@ async fn test_read_version_basic() { Bound::Included(Bytes::from(key_range_right)), )); - let (staging_imm_iter, staging_sst_iter) = + let staging_data_iter = read_version .staging() .prune_overlap(epoch, TableId::default(), &key_range); - let staging_imm = staging_imm_iter.cloned().collect_vec(); - assert_eq!(1, staging_imm.len()); - assert_eq!(test_epoch(4), staging_imm[0].min_epoch()); + let staging_data = staging_data_iter.cloned().collect_vec(); + assert_eq!(2, staging_data.len()); + assert_eq!(test_epoch(4), staging_data[0].min_epoch()); - let staging_ssts = staging_sst_iter.cloned().collect_vec(); - assert_eq!(1, staging_ssts.len()); - assert_eq!(2, staging_ssts[0].get_object_id()); + // let staging_ssts = staging_sst_iter.cloned().collect_vec(); + // assert_eq!(1, staging_ssts.len()); + // assert_eq!(2, staging_ssts[0].get_object_id()); } } @@ -294,23 +292,22 @@ async fn test_read_filter_basic() { let key = Bytes::from(iterator_test_table_key_of(epoch as usize)); let key_range = map_table_key_range((Bound::Included(key.clone()), Bound::Included(key))); - let (staging_imm, staging_sst) = { + let staging_data = { let read_guard = read_version.read(); - let (staging_imm_iter, staging_sst_iter) = { - read_guard - .staging() - .prune_overlap(epoch, TableId::default(), &key_range) - }; - - ( - staging_imm_iter.cloned().collect_vec(), - staging_sst_iter.cloned().collect_vec(), - ) + read_guard + .staging() + .prune_overlap(epoch, TableId::default(), &key_range) + .cloned() + .collect_vec() }; - assert_eq!(1, staging_imm.len()); - assert_eq!(0, staging_sst.len()); - assert!(staging_imm.iter().any(|imm| imm.min_epoch() <= epoch)); + assert_eq!(1, staging_data.len()); + assert!(staging_data.iter().any(|data| { + match data { + StagingData::ImmMem(imm) => imm.min_epoch() <= epoch, + StagingData::Sst(_) => unreachable!("can not be sstable"), + } + })); // test read_filter_for_version { @@ -320,10 +317,9 @@ async fn test_read_filter_basic() { .unwrap(); assert_eq!(1, hummock_read_snapshot.0.len()); - assert_eq!(0, hummock_read_snapshot.1.len()); assert_eq!( read_version.read().committed().max_committed_epoch(), - hummock_read_snapshot.2.max_committed_epoch() + hummock_read_snapshot.1.max_committed_epoch() ); } } diff --git a/src/storage/hummock_test/src/hummock_storage_tests.rs b/src/storage/hummock_test/src/hummock_storage_tests.rs index e7b03fa61c504..140bb5a073e0c 100644 --- a/src/storage/hummock_test/src/hummock_storage_tests.rs +++ b/src/storage/hummock_test/src/hummock_storage_tests.rs @@ -539,8 +539,7 @@ async fn test_state_store_sync() { { // after sync 1 epoch let read_version = hummock_storage.read_version(); - assert_eq!(1, read_version.read().staging().imm.len()); - assert!(read_version.read().staging().sst.is_empty()); + assert_eq!(1, read_version.read().staging().data.len()); } { @@ -581,8 +580,7 @@ async fn test_state_store_sync() { { // after sync all epoch let read_version = hummock_storage.read_version(); - assert!(read_version.read().staging().imm.is_empty()); - assert!(read_version.read().staging().sst.is_empty()); + assert!(read_version.read().staging().data.is_empty()); } { @@ -1461,7 +1459,7 @@ async fn test_hummock_version_reader() { .read() .committed() .max_committed_epoch(), - read_snapshot.2.max_committed_epoch() + read_snapshot.1.max_committed_epoch() ); let iter = hummock_version_reader diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index cbbc3f656e8e0..c0a8515706960 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -122,6 +122,11 @@ impl BufferTracker { pub fn need_more_flush(&self) -> bool { self.get_buffer_size() > self.flush_threshold + self.global_upload_task_size.get() as usize } + + #[cfg(test)] + pub(crate) fn flush_threshold(&self) -> usize { + self.flush_threshold + } } #[derive(Clone)] @@ -431,6 +436,7 @@ impl HummockEventHandler { /// This function will be performed under the protection of the `read_version_mapping` read /// lock, and add write lock on each `read_version` operation + /// TODO: it may cost a lot of CPU to update every local-version for every upload task. We can use instance id of upload task to optimize this operation fn for_each_read_version(&self, mut f: impl FnMut(&mut HummockReadVersion)) { self.local_read_version_mapping .values() @@ -767,24 +773,9 @@ impl HummockEventHandler { self.handle_data_spilled(staging_sstable_info); } - UploaderEvent::ImmMerged(merge_output) => { + UploaderEvent::ImmMerged(_merge_output) => { // update read version for corresponding table shards - if let Some(read_version) = self - .local_read_version_mapping - .get(&merge_output.instance_id) - { - read_version - .write() - .update(VersionUpdate::Staging(StagingData::MergedImmMem( - merge_output.merged_imm, - merge_output.imm_ids, - ))); - } else { - warn!( - "handle ImmMerged: table instance not found. table {:?}, instance {}", - &merge_output.table_id, &merge_output.instance_id - ) - } + unreachable!("This feature has been removed"); } } } @@ -1001,207 +992,22 @@ fn to_sync_result(result: &HummockResult) -> HummockResult(); - spawn_upload_task_tx.send(tx).unwrap(); - spawn(async move { - // wait for main thread to notify returning error - rx.await.unwrap(); - Err(HummockError::other("".to_string())) - }) - }), - Arc::new(move |_, _, imms, _| { - let (tx, rx) = oneshot::channel::<()>(); - let (finish_tx, finish_rx) = oneshot::channel::<()>(); - spawn_merging_task_tx.send((tx, finish_rx)).unwrap(); - spawn(async move { - rx.await.unwrap(); - finish_tx.send(()).unwrap(); - imms[0].clone() - }) - }), - CacheRefiller::default_spawn_refill_task(), - ); - - let tx = event_handler.event_sender(); - - let _join_handle = spawn(event_handler.start_hummock_event_handler_worker()); - - let (read_version_tx, read_version_rx) = oneshot::channel(); - - tx.send(HummockEvent::RegisterReadVersion { - table_id, - new_read_version_sender: read_version_tx, - is_replicated: false, - vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT)), - }) - .unwrap(); - let (read_version, guard) = read_version_rx.await.unwrap(); - let instance_id = guard.instance_id; - - let build_batch = |epoch, spill_offset| { - SharedBufferBatch::build_shared_buffer_batch( - epoch, - spill_offset, - vec![(TableKey(Bytes::from("key")), HummockValue::Delete)], - 10, - table_id, - instance_id, - None, - ) - }; - - let epoch1 = epoch0.next_epoch(); - let imm1 = build_batch(epoch1, 0); - read_version - .write() - .update(VersionUpdate::Staging(StagingData::ImmMem(imm1.clone()))); - tx.send(HummockEvent::ImmToUploader(imm1.clone())).unwrap(); - tx.send(HummockEvent::SealEpoch { - epoch: epoch1, - is_checkpoint: true, - }) - .unwrap(); - let (sync_tx, mut sync_rx) = oneshot::channel(); - tx.send(HummockEvent::AwaitSyncEpoch { - new_sync_epoch: epoch1, - sync_result_sender: sync_tx, - }) - .unwrap(); - - let upload_finish_tx = spawn_upload_task_rx.recv().await.unwrap(); - assert!(poll_fn(|cx| Poll::Ready(sync_rx.poll_unpin(cx))) - .await - .is_pending()); - - let epoch2 = epoch1.next_epoch(); - let mut imm_ids = Vec::new(); - for i in 0..10 { - let imm = build_batch(epoch2, i); - imm_ids.push(imm.batch_id()); - read_version - .write() - .update(VersionUpdate::Staging(StagingData::ImmMem(imm.clone()))); - tx.send(HummockEvent::ImmToUploader(imm)).unwrap(); - } - - for (staging_imm, imm_id) in read_version - .read() - .staging() - .imm - .iter() - .zip_eq_debug(imm_ids.iter().copied().rev().chain(once(imm1.batch_id()))) - { - assert_eq!(staging_imm.batch_id(), imm_id); - } - - // should start merging task - tx.send(HummockEvent::SealEpoch { - epoch: epoch2, - is_checkpoint: false, - }) - .unwrap(); - - println!("before wait spawn merging task"); - - let (merging_start_tx, merging_finish_rx) = spawn_merging_task_rx.recv().await.unwrap(); - merging_start_tx.send(()).unwrap(); - - println!("after wait spawn merging task"); - - // yield to possibly poll the merging task, though it shouldn't poll it because there is unfinished syncing task - yield_now().await; - - for (staging_imm, imm_id) in read_version - .read() - .staging() - .imm - .iter() - .zip_eq_debug(imm_ids.iter().copied().rev().chain(once(imm1.batch_id()))) - { - assert_eq!(staging_imm.batch_id(), imm_id); - } - - upload_finish_tx.send(()).unwrap(); - assert!(sync_rx.await.unwrap().is_err()); - - merging_finish_rx.await.unwrap(); - - // yield to poll the merging task, and then it should have finished. - for _ in 0..10 { - yield_now().await; - } - - assert_eq!( - read_version - .read() - .staging() - .imm - .iter() - .map(|imm| imm.batch_id()) - .collect_vec(), - vec![*imm_ids.last().unwrap(), imm1.batch_id()] - ); - } - #[tokio::test] async fn test_clear_shared_buffer() { let epoch0 = 233; diff --git a/src/storage/src/hummock/event_handler/uploader.rs b/src/storage/src/hummock/event_handler/uploader.rs index d00f64c42cee3..2b2c69d858d7a 100644 --- a/src/storage/src/hummock/event_handler/uploader.rs +++ b/src/storage/src/hummock/event_handler/uploader.rs @@ -242,15 +242,13 @@ impl UploadingTask { fn new(payload: UploadTaskPayload, context: &UploaderContext) -> Self { assert!(!payload.is_empty()); - let mut epochs = payload + let epochs = payload .iter() .flat_map(|imm| imm.epochs().clone()) .sorted() .dedup() .collect_vec(); - // reverse to make newer epochs comes first - epochs.reverse(); let imm_ids = payload.iter().map(|imm| imm.batch_id()).collect_vec(); let task_size = payload.iter().map(|imm| imm.size()).sum(); let task_info = UploadTaskInfo { @@ -395,6 +393,10 @@ struct UnsealedEpochData { } impl UnsealedEpochData { + fn get_imm_data_size(&self) -> usize { + self.imms.iter().map(|imm| imm.size()).sum() + } + fn flush(&mut self, context: &UploaderContext) { let imms = self.imms.drain(..).collect_vec(); if !imms.is_empty() { @@ -654,21 +656,6 @@ impl SealedData { .map(|imms| imms.len()) .sum() } - - #[cfg(test)] - fn imms_by_epoch(&self) -> BTreeMap> { - let mut imms_by_epoch: BTreeMap> = BTreeMap::new(); - self.imms_by_table_shard.iter().for_each(|(_, imms)| { - for imm in imms { - debug_assert!(imm.max_epoch() == imm.min_epoch()); - imms_by_epoch - .entry(imm.max_epoch()) - .or_default() - .push(imm.clone()); - } - }); - imms_by_epoch - } } struct SyncingData { @@ -795,11 +782,6 @@ impl HummockUploader { } } - #[cfg(test)] - pub(crate) fn imm_merge_threshold(&self) -> usize { - self.context.imm_merge_threshold - } - pub(crate) fn buffer_tracker(&self) -> &BufferTracker { &self.context.buffer_tracker } @@ -891,7 +873,7 @@ impl HummockUploader { pub(crate) fn start_merge_imms(&mut self, sealed_epoch: HummockEpoch) { // skip merging if merge threshold is 1 - if self.context.imm_merge_threshold <= 1 { + if self.context.imm_merge_threshold < usize::MAX { return; } @@ -1054,20 +1036,31 @@ impl HummockUploader { } } - pub(crate) fn may_flush(&mut self) { + pub(crate) fn may_flush(&mut self) -> bool { + let mut flushed = false; if self.context.buffer_tracker.need_more_flush() { self.sealed_data.flush(&self.context, true); + flushed = true; } if self.context.buffer_tracker.need_more_flush() { - // iterate from older epoch to newer epoch - for unsealed_data in self.unsealed_data.values_mut() { + let mut unseal_epochs = vec![]; + for (epoch, unsealed_data) in &self.unsealed_data { + unseal_epochs.push((unsealed_data.get_imm_data_size(), *epoch)); + } + // flush large data at first to avoid generate small files. + unseal_epochs.sort_by_key(|item| item.0); + unseal_epochs.reverse(); + for (_, epoch) in unseal_epochs { + let unsealed_data = self.unsealed_data.get_mut(&epoch).unwrap(); unsealed_data.flush(&self.context); + flushed = true; if !self.context.buffer_tracker.need_more_flush() { break; } } } + flushed } pub(crate) fn clear(&mut self) { @@ -1146,7 +1139,9 @@ impl HummockUploader { for unsealed_data in self.unsealed_data.values_mut() { // if None, there is no spilling task. Search for the unsealed data of the next epoch in // the next iteration. - if let Some(sstable_info) = ready!(unsealed_data.spilled_data.poll_success_spill(cx)) { + if let Poll::Ready(Some(sstable_info)) = + unsealed_data.spilled_data.poll_success_spill(cx) + { return Poll::Ready(Some(sstable_info)); } } @@ -1264,6 +1259,7 @@ mod tests { pub trait UploadOutputFuture = Future> + Send + 'static; + pub trait UploadFn = Fn(UploadTaskPayload, UploadTaskInfo) -> Fut + Send + Sync + 'static; @@ -1516,107 +1512,6 @@ mod tests { assert_eq!(epoch1, uploader.max_committed_epoch()); } - #[tokio::test] - async fn test_uploader_merge_imms_without_flush() { - let mut uploader = test_uploader(dummy_success_upload_future); - let mut all_imms = VecDeque::new(); - // assume a chckpoint consists of 11 epochs - let ckpt_intervals = 11; - let imm_merge_threshold: usize = uploader.imm_merge_threshold(); - - // For each epoch, we gen imm for 2 shards and add them to uploader and seal the epoch - // afterward. check uploader's state after each epoch has been sealed - // When we get IMM_MERGE_THRESHOLD epochs, there should be merging task started for sealed - // data. Then we await the merging task and check the uploader's state again. - let mut merged_imms = VecDeque::new(); - - let mut epoch = INITIAL_EPOCH; - for i in 1..=ckpt_intervals { - epoch.inc_epoch(); - let mut imm1 = gen_imm(epoch).await; - let mut imm2 = gen_imm(epoch).await; - - imm1.instance_id = 1 as LocalInstanceId; - imm2.instance_id = 2 as LocalInstanceId; - - uploader.add_imm(imm1.clone()); - uploader.add_imm(imm2.clone()); - - // newer imm comes in front - all_imms.push_front(imm1); - all_imms.push_front(imm2); - - uploader.seal_epoch(epoch); - - assert_eq!(epoch, uploader.max_sealed_epoch); - // check sealed data has two imms - let imms_by_epoch = uploader.sealed_data.imms_by_epoch(); - if let Some((e, imms)) = imms_by_epoch.last_key_value() - && *e == epoch - { - assert_eq!(2, imms.len()); - } - - let epoch_cnt = i; - - if epoch_cnt < imm_merge_threshold { - assert!(uploader.sealed_data.merging_tasks.is_empty()); - assert!(uploader.sealed_data.spilled_data.is_empty()); - assert_eq!(epoch_cnt, uploader.sealed_data.epochs.len()); - } else { - assert_eq!(epoch_cnt, uploader.sealed_data.epochs.len()); - - let unmerged_imm_cnt: usize = epoch_cnt - imm_merge_threshold * merged_imms.len(); - - if unmerged_imm_cnt < imm_merge_threshold { - continue; - } - - let imms_by_shard = &mut uploader.sealed_data.imms_by_table_shard; - // check shard 1 - if let Some(imms) = imms_by_shard.get(&(TEST_TABLE_ID, 1 as LocalInstanceId)) { - assert_eq!(imm_merge_threshold, imms.len()); - } - - // check shard 2 - if let Some(imms) = imms_by_shard.get(&(TEST_TABLE_ID, 2 as LocalInstanceId)) { - assert_eq!(imm_merge_threshold, imms.len()); - } - - // we have enough sealed imms, start merging task - println!("start merging task for epoch {}", epoch); - uploader.start_merge_imms(epoch); - assert!(!uploader.sealed_data.merging_tasks.is_empty()); - assert!(uploader.sealed_data.spilled_data.is_empty()); - - // check after generate merging task - if let Some(imms) = uploader - .sealed_data - .imms_by_table_shard - .get(&(TEST_TABLE_ID, 1 as LocalInstanceId)) - { - assert_eq!(0, imms.len()); - } - if let Some(imms) = uploader - .sealed_data - .imms_by_table_shard - .get(&(TEST_TABLE_ID, 2 as LocalInstanceId)) - { - assert_eq!(0, imms.len()); - } - - // poll the merging task and check the result - match uploader.next_event().await { - UploaderEvent::ImmMerged(output) => { - println!("merging task success for epoch {}", epoch); - merged_imms.push_front(output.merged_imm); - } - _ => unreachable!(), - }; - } - } - } - #[tokio::test] async fn test_uploader_empty_epoch() { let mut uploader = test_uploader(dummy_success_upload_future); @@ -1797,14 +1692,17 @@ mod tests { assert_eq!(epoch6, uploader.max_sealed_epoch); } - fn prepare_uploader_order_test() -> ( + fn prepare_uploader_order_test( + config: &StorageOpts, + skip_schedule: bool, + ) -> ( BufferTracker, HummockUploader, impl Fn(Vec) -> (BoxFuture<'static, ()>, oneshot::Sender<()>), ) { + let gauge = GenericGauge::new("test", "test").unwrap(); // flush threshold is 0. Flush anyway - let buffer_tracker = - BufferTracker::new(usize::MAX, 0, GenericGauge::new("test", "test").unwrap()); + let buffer_tracker = BufferTracker::from_storage_opts(config, gauge); // (the started task send the imm ids of payload, the started task wait for finish notify) #[allow(clippy::type_complexity)] let task_notifier_holder: Arc< @@ -1836,14 +1734,18 @@ mod tests { Arc::new({ move |_: UploadTaskPayload, task_info: UploadTaskInfo| { let task_notifier_holder = task_notifier_holder.clone(); - let (start_tx, finish_rx) = task_notifier_holder.lock().pop_back().unwrap(); - let start_epoch = *task_info.epochs.last().unwrap(); - let end_epoch = *task_info.epochs.first().unwrap(); + let task_item = task_notifier_holder.lock().pop_back(); + let start_epoch = *task_info.epochs.first().unwrap(); + let end_epoch = *task_info.epochs.last().unwrap(); assert!(end_epoch >= start_epoch); spawn(async move { let ssts = gen_sstable_info(start_epoch, end_epoch); - start_tx.send(task_info).unwrap(); - finish_rx.await.unwrap(); + if !skip_schedule { + let (start_tx, finish_rx) = task_item.unwrap(); + start_tx.send(task_info).unwrap(); + finish_rx.await.unwrap(); + } + Ok(UploadTaskOutput { ssts, wait_poll_timer: None, @@ -1870,8 +1772,14 @@ mod tests { } #[tokio::test] - async fn test_uploader_finish_in_order() { - let (buffer_tracker, mut uploader, new_task_notifier) = prepare_uploader_order_test(); + async fn test_uploader_finish_not_in_order() { + let config = StorageOpts { + shared_buffer_capacity_mb: 1024 * 1024, + shared_buffer_flush_ratio: 0.0, + ..Default::default() + }; + let (buffer_tracker, mut uploader, new_task_notifier) = + prepare_uploader_order_test(&config, false); let epoch1 = INITIAL_EPOCH.next_epoch(); let epoch2 = epoch1.next_epoch(); @@ -1897,7 +1805,6 @@ mod tests { assert_uploader_pending(&mut uploader).await; finish_tx2.send(()).unwrap(); - assert_uploader_pending(&mut uploader).await; finish_tx1.send(()).unwrap(); if let UploaderEvent::DataSpilled(sst) = uploader.next_event().await { @@ -2075,7 +1982,7 @@ mod tests { .unwrap() .staging_ssts; assert_eq!(3, synced_data4.len()); - assert_eq!(&vec![epoch4, epoch3], synced_data4[0].epochs()); + assert_eq!(&vec![epoch3, epoch4], synced_data4[0].epochs()); assert_eq!( &vec![imm4.batch_id(), imm3_3.batch_id()], synced_data4[0].imm_ids() @@ -2091,4 +1998,40 @@ mod tests { // epoch2: sst([imm2]) // epoch4: sst([imm4, imm3_3]), sst([imm3_2]), sst([imm3_1]) } + + #[tokio::test] + async fn test_uploader_frequently_flush() { + let config = StorageOpts { + shared_buffer_capacity_mb: 1, + shared_buffer_flush_ratio: 0.0002, + ..Default::default() + }; + let (buffer_tracker, mut uploader, _new_task_notifier) = + prepare_uploader_order_test(&config, true); + + let epoch1 = INITIAL_EPOCH.next_epoch(); + let epoch2 = epoch1.next_epoch(); + let flush_threshold = buffer_tracker.flush_threshold(); + let memory_limiter = buffer_tracker.get_memory_limiter().clone(); + + // imm2 contains data in newer epoch, but added first + let mut total_memory = 0; + while total_memory < flush_threshold { + let imm = gen_imm_with_limiter(epoch2, Some(memory_limiter.as_ref())).await; + total_memory += imm.size(); + if total_memory > flush_threshold { + break; + } + uploader.add_imm(imm); + } + let imm = gen_imm_with_limiter(epoch1, Some(memory_limiter.as_ref())).await; + uploader.add_imm(imm); + assert!(uploader.may_flush()); + + for _ in 0..10 { + let imm = gen_imm_with_limiter(epoch1, Some(memory_limiter.as_ref())).await; + uploader.add_imm(imm); + assert!(!uploader.may_flush()); + } + } } diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index cef5cca728a17..7d45c6f79bce3 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -29,14 +29,13 @@ use risingwave_hummock_sdk::key::{ is_empty_key_range, vnode, vnode_range, TableKey, TableKeyRange, }; use risingwave_hummock_sdk::HummockReadEpoch; -use risingwave_pb::hummock::SstableInfo; use risingwave_rpc_client::HummockMetaClient; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; use tokio::sync::oneshot; use tracing::error; use super::local_hummock_storage::LocalHummockStorage; -use super::version::{read_filter_for_version, CommittedVersion, HummockVersionReader}; +use super::version::{read_filter_for_version, HummockVersionReader, ReadVersionTuple}; use crate::error::StorageResult; use crate::filter_key_extractor::{FilterKeyExtractorManager, RpcFilterKeyExtractorManager}; use crate::hummock::backup_reader::{BackupReader, BackupReaderRef}; @@ -55,7 +54,6 @@ use crate::hummock::{ HummockEpoch, HummockError, HummockResult, HummockStorageIterator, MemoryLimiter, SstableObjectIdManager, SstableObjectIdManagerRef, SstableStoreRef, }; -use crate::mem_table::ImmutableMemtable; use crate::monitor::{CompactorMetrics, HummockStateStoreMetrics, StoreLocalStatistic}; use crate::opts::StorageOpts; use crate::store::*; @@ -115,8 +113,6 @@ pub struct HummockStorage { compact_await_tree_reg: Option, } -pub type ReadVersionTuple = (Vec, Vec, CommittedVersion); - pub fn get_committed_read_version_tuple( version: PinnedVersion, table_id: TableId, @@ -126,7 +122,7 @@ pub fn get_committed_read_version_tuple( if let Some(index) = version.table_watermark_index().get(&table_id) { index.rewrite_range_with_table_watermark(epoch, &mut key_range) } - (key_range, (vec![], vec![], version)) + (key_range, (vec![], version)) } impl HummockStorage { diff --git a/src/storage/src/hummock/store/local_hummock_storage.rs b/src/storage/src/hummock/store/local_hummock_storage.rs index 5215a9eaf18db..f167e7f44c734 100644 --- a/src/storage/src/hummock/store/local_hummock_storage.rs +++ b/src/storage/src/hummock/store/local_hummock_storage.rs @@ -586,13 +586,18 @@ impl LocalHummockStorage { } } -pub type StagingDataIterator = MergeIterator< - HummockIteratorUnion, SstableIterator>, +pub type StorageSubIteratorUnion<'a> = HummockIteratorUnion< + Forward, + SharedBufferBatchIterator, + SstableIterator, + ConcatIteratorInner, + MemTableHummockIterator<'a>, >; + pub type HummockStorageIteratorPayloadInner<'a> = MergeIterator< HummockIteratorUnion< Forward, - StagingDataIterator, + SharedBufferBatchIterator, SstableIterator, ConcatIteratorInner, MemTableHummockIterator<'a>, diff --git a/src/storage/src/hummock/store/version.rs b/src/storage/src/hummock/store/version.rs index 9e54c61588508..df390d565f8bc 100644 --- a/src/storage/src/hummock/store/version.rs +++ b/src/storage/src/hummock/store/version.rs @@ -13,9 +13,8 @@ // limitations under the License. use std::cmp::Ordering; -use std::collections::vec_deque::VecDeque; use std::collections::HashSet; -use std::iter::once; +use std::ops::Bound; use std::ops::Bound::Included; use std::sync::Arc; use std::time::Instant; @@ -36,23 +35,20 @@ use risingwave_hummock_sdk::table_watermark::{ }; use risingwave_hummock_sdk::version::HummockVersionDelta; use risingwave_hummock_sdk::{EpochWithGap, HummockEpoch, LocalSstableInfo}; -use risingwave_pb::hummock::{LevelType, SstableInfo}; +use risingwave_pb::hummock::LevelType; use sync_point::sync_point; -use super::StagingDataIterator; use crate::error::StorageResult; use crate::hummock::iterator::{ConcatIterator, HummockIteratorUnion, MergeIterator, UserIterator}; use crate::hummock::local_version::pinned_version::PinnedVersion; use crate::hummock::sstable::SstableIteratorReadOptions; use crate::hummock::sstable_store::SstableStoreRef; use crate::hummock::utils::{ - check_subset_preserve_order, filter_single_sst, prune_nonoverlapping_ssts, - prune_overlapping_ssts, range_overlap, search_sst_idx, + prune_nonoverlapping_ssts, prune_overlapping_ssts, range_overlap, search_sst_idx, }; use crate::hummock::{ get_from_batch, get_from_sstable_info, hit_sstable_bloom_filter, HummockStorageIterator, - HummockStorageIteratorInner, LocalHummockStorageIterator, ReadVersionTuple, Sstable, - SstableIterator, + HummockStorageIteratorInner, LocalHummockStorageIterator, Sstable, SstableIterator, }; use crate::mem_table::{ImmId, ImmutableMemtable, MemTableHummockIterator}; use crate::monitor::{ @@ -85,8 +81,8 @@ impl StagingSstableInfo { imm_ids: Vec, imm_size: usize, ) -> Self { - // the epochs are sorted from higher epoch to lower epoch - assert!(epochs.is_sorted_by(|epoch1, epoch2| epoch2.partial_cmp(epoch1))); + // the epochs are sorted from lower epoch to higher epoch + assert!(epochs.is_sorted()); Self { sstable_infos, epochs, @@ -115,10 +111,33 @@ impl StagingSstableInfo { #[derive(Clone)] pub enum StagingData { ImmMem(ImmutableMemtable), - MergedImmMem(ImmutableMemtable, Vec), Sst(Arc), } +impl StagingData { + pub fn min_epoch(&self) -> HummockEpoch { + match self { + StagingData::ImmMem(imm) => imm.min_epoch(), + StagingData::Sst(sst) => *sst.epochs.first().unwrap(), + } + } + + pub fn to_imm(&self) -> Option<&ImmutableMemtable> { + match self { + StagingData::ImmMem(imm) => Some(imm), + StagingData::Sst(_) => None, + } + } + + pub fn max_epoch(&self) -> HummockEpoch { + match self { + StagingData::ImmMem(imm) => imm.max_epoch(), + StagingData::Sst(sst) => *sst.epochs.last().unwrap(), + } + } +} +pub type ReadVersionTuple = (Vec, CommittedVersion); + pub enum VersionUpdate { /// a new staging data entry will be added. Staging(StagingData), @@ -133,13 +152,8 @@ pub enum VersionUpdate { #[derive(Clone)] pub struct StagingVersion { - // newer data comes first - // Note: Currently, building imm and writing to staging version is not atomic, and therefore - // imm of smaller batch id may be added later than one with greater batch id - pub imm: VecDeque, - - // newer data comes first - pub sst: VecDeque>, + // newer data comes in last, so we need to reverse them in read. + pub data: Vec, } impl StagingVersion { @@ -150,48 +164,33 @@ impl StagingVersion { max_epoch_inclusive: HummockEpoch, table_id: TableId, table_key_range: &'a TableKeyRange, - ) -> ( - impl Iterator + 'a, - impl Iterator + 'a, - ) { + ) -> impl Iterator + 'a { let (ref left, ref right) = table_key_range; let left = left.as_ref().map(|key| TableKey(key.0.as_ref())); let right = right.as_ref().map(|key| TableKey(key.0.as_ref())); - let overlapped_imms = self.imm.iter().filter(move |imm| { + // reverse to make the newer data come first. + self.data.iter().rev().filter(move |data| { // retain imm which is overlapped with (min_epoch_exclusive, max_epoch_inclusive] - imm.min_epoch() <= max_epoch_inclusive - && imm.table_id == table_id - && range_overlap( - &(left, right), - &imm.start_table_key(), - Included(&imm.end_table_key()), - ) - }); - - // TODO: Remove duplicate sst based on sst id - let overlapped_ssts = self - .sst - .iter() - .filter(move |staging_sst| { - let sst_max_epoch = *staging_sst.epochs.last().expect("epochs not empty"); - sst_max_epoch <= max_epoch_inclusive - }) - .flat_map(move |staging_sst| { - // TODO: sstable info should be concat-able after each streaming table owns a read - // version. May use concat sstable iter instead in some cases. - staging_sst - .sstable_infos - .iter() - .map(|sstable| &sstable.sst_info) - .filter(move |sstable: &&SstableInfo| { - filter_single_sst(sstable, table_id, table_key_range) - }) - }); - (overlapped_imms, overlapped_ssts) + match data { + StagingData::ImmMem(imm) => { + imm.min_epoch() <= max_epoch_inclusive + && imm.table_id == table_id + && range_overlap( + &(left, right), + &imm.start_table_key(), + Included(&imm.end_table_key()), + ) + } + StagingData::Sst(staging_sst) => { + let sst_min_epoch = *staging_sst.epochs.first().expect("epochs not empty"); + sst_min_epoch <= max_epoch_inclusive + } + } + }) } pub fn is_empty(&self) -> bool { - self.imm.is_empty() && self.sst.is_empty() + self.data.is_empty() } } @@ -237,8 +236,7 @@ impl HummockReadVersion { .get(&table_id) .cloned(), staging: StagingVersion { - imm: VecDeque::default(), - sst: VecDeque::default(), + data: Vec::default(), }, committed: committed_version, @@ -275,15 +273,7 @@ impl HummockReadVersion { // TODO: add a check to ensure that the added batch id of added imm is greater than // the batch id of imm at the front StagingData::ImmMem(imm) => { - if let Some(item) = self.staging.imm.front() { - // check batch_id order from newest to old - debug_assert!(item.batch_id() < imm.batch_id()); - } - - self.staging.imm.push_front(imm) - } - StagingData::MergedImmMem(merged_imm, imm_ids) => { - self.add_merged_imm(merged_imm, imm_ids); + self.staging.data.push(StagingData::ImmMem(imm)); } StagingData::Sst(staging_sst_ref) => { // The following properties must be ensured: @@ -296,62 +286,33 @@ impl HummockReadVersion { // are always the suffix of self.staging.imm // Check 1) - debug_assert!(self + // Calculate intersection + let staging_imm_ids_from_imms: HashSet = self .staging - .imm + .data .iter() - .rev() - .is_sorted_by_key(|imm| imm.batch_id())); - - // Calculate intersection - let staging_imm_ids_from_imms: HashSet = - self.staging.imm.iter().map(|imm| imm.batch_id()).collect(); + .filter_map(|data| data.to_imm().map(|imm| imm.batch_id())) + .collect(); // intersected batch_id order from oldest to newest - let intersect_imm_ids = staging_sst_ref + let intersect_imm_ids: HashSet = staging_sst_ref .imm_ids .iter() .rev() .copied() .filter(|id| staging_imm_ids_from_imms.contains(id)) - .collect_vec(); + .collect(); if !intersect_imm_ids.is_empty() { // Check 2) - debug_assert!(check_subset_preserve_order( - intersect_imm_ids.iter().copied(), - self.staging.imm.iter().map(|imm| imm.batch_id()).rev(), - )); - - // Check 3) and replace imms with a staging sst - for imm_id in &intersect_imm_ids { - if let Some(imm) = self.staging.imm.back() { - if *imm_id == imm.batch_id() { - self.staging.imm.pop_back(); - } - } else { - let local_imm_ids = self - .staging - .imm - .iter() - .map(|imm| imm.batch_id()) - .collect_vec(); - - unreachable!( - "should not reach here staging_sst.size {}, - staging_sst.imm_ids {:?}, - staging_sst.epochs {:?}, - local_imm_ids {:?}, - intersect_imm_ids {:?}", - staging_sst_ref.imm_size, - staging_sst_ref.imm_ids, - staging_sst_ref.epochs, - local_imm_ids, - intersect_imm_ids, - ); + self.staging.data.retain(|data| match data { + StagingData::ImmMem(imm) => { + !intersect_imm_ids.contains(&imm.batch_id()) } - } - self.staging.sst.push_front(staging_sst_ref); + StagingData::Sst(_) => true, + }); + self.staging.data.push(StagingData::Sst(staging_sst_ref)); + self.staging.data.sort_by_key(|data| data.min_epoch()); } } }, @@ -367,17 +328,8 @@ impl HummockReadVersion { { // TODO: remove it when support update staging local_sst self.staging - .imm + .data .retain(|imm| imm.min_epoch() > max_committed_epoch); - - self.staging.sst.retain(|sst| { - sst.epochs.first().expect("epochs not empty") > &max_committed_epoch - }); - - // check epochs.last() > MCE - assert!(self.staging.sst.iter().all(|sst| { - sst.epochs.last().expect("epochs not empty") > &max_committed_epoch - })); } if let Some(committed_watermarks) = @@ -421,92 +373,6 @@ impl HummockReadVersion { } } - /// `imm_ids` is the list of imm ids that are merged into this batch - /// This field is immutable. Larger imm id at the front. - pub fn add_merged_imm(&mut self, merged_imm: ImmutableMemtable, imm_ids: Vec) { - assert!(imm_ids.iter().rev().is_sorted()); - let min_imm_id = *imm_ids.last().expect("non-empty"); - - let back = self.staging.imm.back().expect("should not be empty"); - - // pop and save imms that are written earlier than the oldest imm if there is any - let earlier_imms = if back.batch_id() < min_imm_id { - let mut earlier_imms = VecDeque::with_capacity(self.staging.imm.len()); - loop { - let batch_id = self - .staging - .imm - .back() - .expect("should not be empty") - .batch_id(); - match batch_id.cmp(&min_imm_id) { - Ordering::Less => { - let imm = self.staging.imm.pop_back().unwrap(); - earlier_imms.push_front(imm); - } - Ordering::Equal => { - break; - } - Ordering::Greater => { - let remaining_staging_imm_ids = self - .staging - .imm - .iter() - .map(|imm| imm.batch_id()) - .collect_vec(); - let earlier_imm_ids = - earlier_imms.iter().map(|imm| imm.batch_id()).collect_vec(); - - unreachable!( - "must have break in equal: {:?} {:?} {:?}", - remaining_staging_imm_ids, earlier_imm_ids, imm_ids - ) - } - } - } - Some(earlier_imms) - } else { - assert_eq!( - back.batch_id(), - min_imm_id, - "{:?} {:?}", - { - self.staging - .imm - .iter() - .map(|imm| imm.batch_id()) - .collect_vec() - }, - imm_ids - ); - None - }; - - // iter from smaller imm and take the older imm at the back. - for imm_id in imm_ids.iter().rev() { - let imm = self.staging.imm.pop_back().expect("should exist"); - assert_eq!( - imm.batch_id(), - *imm_id, - "{:?} {:?} {}", - { - self.staging - .imm - .iter() - .map(|imm| imm.batch_id()) - .collect_vec() - }, - imm_ids, - imm_id, - ); - } - - self.staging.imm.push_back(merged_imm); - if let Some(earlier_imms) = earlier_imms { - self.staging.imm.extend(earlier_imms); - } - } - pub fn is_replicated(&self) -> bool { self.is_replicated } @@ -538,15 +404,13 @@ pub fn read_filter_for_version( watermark.rewrite_range_with_table_watermark(epoch, &mut table_key_range) } - let (imm_iter, sst_iter) = - read_version_guard - .staging() - .prune_overlap(epoch, table_id, &table_key_range); + let data_iter = read_version_guard + .staging() + .prune_overlap(epoch, table_id, &table_key_range); - let imms = imm_iter.cloned().collect(); - let ssts = sst_iter.cloned().collect(); + let data = data_iter.cloned().collect(); - Ok((table_key_range, (imms, ssts, committed_version))) + Ok((table_key_range, (data, committed_version))) } #[derive(Clone)] @@ -588,7 +452,7 @@ impl HummockVersionReader { read_options: ReadOptions, read_version_tuple: ReadVersionTuple, ) -> StorageResult> { - let (imms, uncommitted_ssts, committed_version) = read_version_tuple; + let (data, committed_version) = read_version_tuple; let min_epoch = gen_min_epoch(epoch, read_options.retention_seconds.as_ref()); let mut stats_guard = @@ -597,28 +461,6 @@ impl HummockVersionReader { local_stats.found_key = true; // 1. read staging data - for imm in &imms { - // skip imm that only holding out-of-date data - if imm.max_epoch() < min_epoch { - continue; - } - - local_stats.staging_imm_get_count += 1; - - if let Some((data, data_epoch)) = get_from_batch( - imm, - TableKey(table_key.as_ref()), - epoch, - &read_options, - local_stats, - ) { - return Ok(if data_epoch.pure_epoch() < min_epoch { - None - } else { - data.into_user_value() - }); - } - } // 2. order guarantee: imm -> sst let dist_key_hash = read_options.prefix_hint.as_ref().map(|dist_key| { @@ -632,23 +474,57 @@ impl HummockVersionReader { TableKey(table_key.clone()), EpochWithGap::new(epoch, MAX_SPILL_TIMES), ); - for local_sst in &uncommitted_ssts { + let single_user_key_range_ref = ( + Bound::Included(full_key.user_key.as_ref()), + Bound::Included(full_key.user_key.as_ref()), + ); + for staging_data in &data { local_stats.staging_sst_get_count += 1; - if let Some((data, data_epoch)) = get_from_sstable_info( - self.sstable_store.clone(), - local_sst, - full_key.to_ref(), - &read_options, - dist_key_hash, - local_stats, - ) - .await? - { - return Ok(if data_epoch.pure_epoch() < min_epoch { - None - } else { - data.into_user_value() - }); + if staging_data.max_epoch() < min_epoch { + continue; + } + match staging_data { + StagingData::ImmMem(imm) => { + local_stats.staging_imm_get_count += 1; + if let Some((data, data_epoch)) = get_from_batch( + imm, + full_key.user_key.table_key.to_ref(), + epoch, + &read_options, + local_stats, + ) { + return Ok(if data_epoch.pure_epoch() < min_epoch { + None + } else { + data.into_user_value() + }); + } + } + StagingData::Sst(info) => { + let sstable_infos = prune_overlapping_ssts( + info.sstable_infos.iter().map(|info| &info.sst_info), + read_options.table_id.table_id(), + single_user_key_range_ref, + ); + for sst in sstable_infos { + if let Some((data, data_epoch)) = get_from_sstable_info( + self.sstable_store.clone(), + sst, + full_key.to_ref(), + &read_options, + dist_key_hash, + local_stats, + ) + .await? + { + return Ok(if data_epoch.pure_epoch() < min_epoch { + None + } else { + data.into_user_value() + }); + } + } + } } } @@ -663,11 +539,10 @@ impl HummockVersionReader { match level.level_type() { LevelType::Overlapping | LevelType::Unspecified => { - let single_table_key_range = table_key.clone()..=table_key.clone(); let sstable_infos = prune_overlapping_ssts( - &level.table_infos, - read_options.table_id, - &single_table_key_range, + level.table_infos.iter(), + read_options.table_id.table_id(), + single_user_key_range_ref, ); for sstable_info in sstable_infos { local_stats.overlapping_get_count += 1; @@ -753,7 +628,7 @@ impl HummockVersionReader { table_key_range: TableKeyRange, epoch: u64, read_options: ReadOptions, - read_version_tuple: (Vec, Vec, CommittedVersion), + read_version_tuple: (Vec, CommittedVersion), memtable_iter: MemTableHummockIterator<'a>, ) -> StorageResult> { self.iter_inner( @@ -774,14 +649,9 @@ impl HummockVersionReader { read_version_tuple: ReadVersionTuple, mem_table: Option>, ) -> StorageResult> { - let (imms, uncommitted_ssts, committed) = read_version_tuple; + let (staging_data, committed) = read_version_tuple; let mut local_stats = StoreLocalStatistic::default(); - let mut staging_iters = Vec::with_capacity(imms.len() + uncommitted_ssts.len()); - local_stats.staging_imm_iter_count = imms.len() as u64; - for imm in imms { - staging_iters.push(HummockIteratorUnion::First(imm.into_forward_iter())); - } // 2. build iterator from committed // Because SST meta records encoded key range, @@ -791,7 +661,6 @@ impl HummockVersionReader { user_key_range.0.as_ref().map(UserKey::as_ref), user_key_range.1.as_ref().map(UserKey::as_ref), ); - let mut staging_sst_iter_count = 0; // encode once let bloom_filter_prefix_hash = read_options .prefix_hint @@ -804,35 +673,44 @@ impl HummockVersionReader { sst_read_options.max_preload_retry_times = self.preload_retry_times; } let sst_read_options = Arc::new(sst_read_options); - for sstable_info in &uncommitted_ssts { - let table_holder = self - .sstable_store - .sstable(sstable_info, &mut local_stats) - .await?; - - if let Some(prefix_hash) = bloom_filter_prefix_hash.as_ref() { - if !hit_sstable_bloom_filter( - &table_holder, - &user_key_range_ref, - *prefix_hash, - &mut local_stats, - ) { - continue; + let mut sub_iters = Vec::with_capacity(staging_data.len()); + for data in staging_data { + match data { + StagingData::ImmMem(imm) => { + sub_iters.push(HummockIteratorUnion::First(imm.into_forward_iter())) + } + StagingData::Sst(info) => { + let table_infos = prune_overlapping_ssts( + info.sstable_infos.iter().map(|sst| &sst.sst_info), + read_options.table_id.table_id(), + user_key_range_ref, + ); + for sstable_info in table_infos { + let table_holder = self + .sstable_store + .sstable(sstable_info, &mut local_stats) + .await?; + + if let Some(prefix_hash) = bloom_filter_prefix_hash.as_ref() { + if !hit_sstable_bloom_filter( + &table_holder, + &user_key_range_ref, + *prefix_hash, + &mut local_stats, + ) { + continue; + } + } + sub_iters.push(HummockIteratorUnion::Second(SstableIterator::new( + table_holder, + self.sstable_store.clone(), + sst_read_options.clone(), + ))); + } } } - - staging_sst_iter_count += 1; - staging_iters.push(HummockIteratorUnion::Second(SstableIterator::new( - table_holder, - self.sstable_store.clone(), - sst_read_options.clone(), - ))); } - local_stats.staging_sst_iter_count = staging_sst_iter_count; - let staging_iter: StagingDataIterator = MergeIterator::new(staging_iters); - - let mut non_overlapping_iters = Vec::new(); - let mut overlapping_iters = Vec::new(); + local_stats.staging_sst_iter_count = sub_iters.len() as u64; let timer = Instant::now(); for level in committed.levels(read_options.table_id) { @@ -855,11 +733,11 @@ impl HummockVersionReader { continue; } if sstables.len() > 1 { - non_overlapping_iters.push(ConcatIterator::new( + sub_iters.push(HummockIteratorUnion::Third(ConcatIterator::new( sstables, self.sstable_store.clone(), sst_read_options.clone(), - )); + ))); local_stats.non_overlapping_iter_count += 1; } else { let sstable = self @@ -882,21 +760,22 @@ impl HummockVersionReader { // We put the SstableIterator in `overlapping_iters` just for convenience since // it overlaps with SSTs in other levels. In metrics reporting, we still count // it in `non_overlapping_iter_count`. - overlapping_iters.push(SstableIterator::new( + sub_iters.push(HummockIteratorUnion::Second(SstableIterator::new( sstable, self.sstable_store.clone(), sst_read_options.clone(), - )); + ))); + local_stats.non_overlapping_iter_count += 1; } } else { let table_infos = prune_overlapping_ssts( - &level.table_infos, - read_options.table_id, - &table_key_range, + level.table_infos.iter().rev(), + read_options.table_id.table_id(), + user_key_range_ref, ); // Overlapping - let fetch_meta_req = table_infos.rev().collect_vec(); + let fetch_meta_req = table_infos.collect_vec(); if fetch_meta_req.is_empty() { continue; } @@ -916,11 +795,11 @@ impl HummockVersionReader { continue; } } - overlapping_iters.push(SstableIterator::new( + sub_iters.push(HummockIteratorUnion::Second(SstableIterator::new( sstable, self.sstable_store.clone(), sst_read_options.clone(), - )); + ))); local_stats.overlapping_iter_count += 1; } } @@ -934,22 +813,12 @@ impl HummockVersionReader { .iter_slow_fetch_meta_cache_unhits .set(local_stats.cache_meta_block_miss as i64); } + if let Some(iter) = mem_table { + sub_iters.push(HummockIteratorUnion::Fourth(iter)); + } // 3. build user_iterator - let merge_iter = MergeIterator::new( - once(HummockIteratorUnion::First(staging_iter)) - .chain( - overlapping_iters - .into_iter() - .map(HummockIteratorUnion::Second), - ) - .chain( - non_overlapping_iters - .into_iter() - .map(HummockIteratorUnion::Third), - ) - .chain(mem_table.into_iter().map(HummockIteratorUnion::Fourth)), - ); + let merge_iter = MergeIterator::new(sub_iters); let user_key_range = ( user_key_range.0.map(|key| key.cloned()), @@ -992,16 +861,11 @@ impl HummockVersionReader { } let table_id = read_options.table_id; - let (imms, uncommitted_ssts, committed_version) = read_version_tuple; + let (data, committed_version) = read_version_tuple; let mut stats_guard = MayExistLocalMetricsGuard::new(self.state_store_metrics.clone(), table_id); // 1. check staging data - for imm in &imms { - if imm.range_exists(&table_key_range) { - return Ok(true); - } - } let user_key_range = bound_table_key_range(read_options.table_id, &table_key_range); let user_key_range_ref = ( @@ -1013,17 +877,28 @@ impl HummockVersionReader { } else { // only use `table_key_range` to see whether all SSTs are filtered out // without looking at bloom filter because prefix_hint is not provided - if !uncommitted_ssts.is_empty() { - // uncommitted_ssts is already pruned by `table_key_range` so no extra check is - // needed. - return Ok(true); + for staing_data in &data { + if let StagingData::ImmMem(imm) = staing_data { + if imm.range_exists(&table_key_range) { + return Ok(true); + } + } else { + // uncommitted_ssts is already pruned by `table_key_range` so no extra check is + // needed. + return Ok(true); + } } + for level in committed_version.levels(table_id) { match level.level_type() { LevelType::Overlapping | LevelType::Unspecified => { - if prune_overlapping_ssts(&level.table_infos, table_id, &table_key_range) - .next() - .is_some() + if prune_overlapping_ssts( + level.table_infos.iter(), + read_options.table_id.table_id(), + user_key_range_ref, + ) + .next() + .is_some() { return Ok(true); } @@ -1042,18 +917,34 @@ impl HummockVersionReader { }; // 2. order guarantee: imm -> sst - for local_sst in &uncommitted_ssts { - stats_guard.local_stats.may_exist_check_sstable_count += 1; - if hit_sstable_bloom_filter( - self.sstable_store - .sstable(local_sst, &mut stats_guard.local_stats) - .await? - .as_ref(), - &user_key_range_ref, - bloom_filter_prefix_hash, - &mut stats_guard.local_stats, - ) { - return Ok(true); + for staging_data in &data { + match staging_data { + StagingData::ImmMem(imm) => { + if imm.range_exists(&table_key_range) { + return Ok(true); + } + } + StagingData::Sst(local_ssts) => { + let sstable_infos = prune_overlapping_ssts( + local_ssts.sstable_infos.iter().map(|info| &info.sst_info), + read_options.table_id.table_id(), + user_key_range_ref, + ); + for local_sst in sstable_infos { + stats_guard.local_stats.may_exist_check_sstable_count += 1; + if hit_sstable_bloom_filter( + self.sstable_store + .sstable(local_sst, &mut stats_guard.local_stats) + .await? + .as_ref(), + &user_key_range_ref, + bloom_filter_prefix_hash, + &mut stats_guard.local_stats, + ) { + return Ok(true); + } + } + } } } @@ -1067,8 +958,11 @@ impl HummockVersionReader { } match level.level_type() { LevelType::Overlapping | LevelType::Unspecified => { - let sstable_infos = - prune_overlapping_ssts(&level.table_infos, table_id, &table_key_range); + let sstable_infos = prune_overlapping_ssts( + level.table_infos.iter(), + read_options.table_id.table_id(), + user_key_range_ref, + ); for sstable_info in sstable_infos { stats_guard.local_stats.may_exist_check_sstable_count += 1; if hit_sstable_bloom_filter( diff --git a/src/storage/src/hummock/utils.rs b/src/storage/src/hummock/utils.rs index 82af4beadf761..6afcfe54c983a 100644 --- a/src/storage/src/hummock/utils.rs +++ b/src/storage/src/hummock/utils.rs @@ -23,9 +23,7 @@ use std::time::Duration; use bytes::Bytes; use foyer::memory::CacheContext; use risingwave_common::catalog::{TableId, TableOption}; -use risingwave_hummock_sdk::key::{ - bound_table_key_range, EmptySliceRef, FullKey, TableKey, UserKey, -}; +use risingwave_hummock_sdk::key::{FullKey, TableKey, UserKey, UserKeyRangeRef}; use risingwave_hummock_sdk::version::HummockVersion; use risingwave_hummock_sdk::{can_concat, HummockEpoch}; use risingwave_pb::hummock::SstableInfo; @@ -97,29 +95,23 @@ pub fn validate_table_key_range(version: &HummockVersion) { } } -pub fn filter_single_sst(info: &SstableInfo, table_id: TableId, table_key_range: &R) -> bool -where - R: RangeBounds>, - B: AsRef<[u8]> + EmptySliceRef, -{ +pub fn filter_single_sst( + info: &SstableInfo, + table_id: u32, + user_key_range_ref: UserKeyRangeRef<'_>, +) -> bool { let table_range = info.key_range.as_ref().unwrap(); let table_start = FullKey::decode(table_range.left.as_slice()).user_key; let table_end = FullKey::decode(table_range.right.as_slice()).user_key; - let (left, right) = bound_table_key_range(table_id, table_key_range); - let left: Bound> = left.as_ref().map(|key| key.as_ref()); - let right: Bound> = right.as_ref().map(|key| key.as_ref()); range_overlap( - &(left, right), + &user_key_range_ref, &table_start, if table_range.right_exclusive { Bound::Excluded(&table_end) } else { Bound::Included(&table_end) }, - ) && info - .get_table_ids() - .binary_search(&table_id.table_id()) - .is_ok() + ) && info.get_table_ids().binary_search(&table_id).is_ok() } /// Search the SST containing the specified key within a level, using binary search. @@ -134,17 +126,12 @@ pub(crate) fn search_sst_idx(ssts: &[SstableInfo], key: UserKey<&[u8]>) -> usize /// Prune overlapping SSTs that does not overlap with a specific key range or does not overlap with /// a specific table id. Returns the sst ids after pruning. -pub fn prune_overlapping_ssts<'a, R, B>( - ssts: &'a [SstableInfo], - table_id: TableId, - table_key_range: &'a R, -) -> impl DoubleEndedIterator -where - R: RangeBounds>, - B: AsRef<[u8]> + EmptySliceRef, -{ - ssts.iter() - .filter(move |info| filter_single_sst(info, table_id, table_key_range)) +pub fn prune_overlapping_ssts<'a>( + ssts: impl Iterator, + table_id: u32, + user_key_range_ref: UserKeyRangeRef<'a>, +) -> impl Iterator { + ssts.filter(move |info| filter_single_sst(info, table_id, user_key_range_ref)) } /// Prune non-overlapping SSTs that does not overlap with a specific key range or does not overlap