diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index f651b345b3157..1778e263c54f9 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -707,6 +707,10 @@ where if !task_config.gc_delete_keys && del_iter.is_valid() && !is_max_epoch(del_iter.earliest_epoch()) + && !compaction_filter.should_delete(FullKey::from_user_key( + full_key.user_key, + del_iter.earliest_epoch(), + )) { sst_builder .add_monotonic_delete(MonotonicDeleteEvent { @@ -775,11 +779,17 @@ where while del_iter.is_valid() && del_iter.key().as_ref().le(&target_extended_user_key) { let event_key = del_iter.key().to_vec(); del_iter.next().await?; - if !task_config.gc_delete_keys { + let new_epoch = del_iter.earliest_epoch(); + if !task_config.gc_delete_keys + && !compaction_filter.should_delete(FullKey::from_user_key( + event_key.left_user_key.as_ref(), + new_epoch, + )) + { sst_builder .add_monotonic_delete(MonotonicDeleteEvent { - new_epoch: del_iter.earliest_epoch(), event_key, + new_epoch, }) .await?; } @@ -870,6 +880,7 @@ where let end_key_ref = extended_largest_user_key.as_ref(); while del_iter.is_valid() { if !end_key_ref.is_empty() && del_iter.key().ge(&end_key_ref) { + // We do not need to check right bound of delete-range because build would not add it. sst_builder .add_monotonic_delete(MonotonicDeleteEvent { event_key: extended_largest_user_key, @@ -880,12 +891,19 @@ where } let event_key = del_iter.key().to_vec(); del_iter.next().await?; - sst_builder - .add_monotonic_delete(MonotonicDeleteEvent { - new_epoch: del_iter.earliest_epoch(), - event_key, - }) - .await?; + let new_epoch = del_iter.earliest_epoch(); + let drop = compaction_filter.should_delete(FullKey::from_user_key( + event_key.left_user_key.as_ref(), + new_epoch, + )); + if !drop { + sst_builder + .add_monotonic_delete(MonotonicDeleteEvent { + event_key, + new_epoch, + }) + .await?; + } } } if let Some(last_table_id) = last_table_id.take() { @@ -900,58 +918,75 @@ where #[cfg(test)] mod tests { - use std::collections::HashSet; + use std::collections::{HashSet, VecDeque}; use risingwave_common::catalog::TableId; - use risingwave_hummock_sdk::key::UserKey; - use risingwave_pb::hummock::InputLevel; + use risingwave_hummock_sdk::key::{TableKey, UserKey}; + use risingwave_hummock_sdk::prost_key_range::KeyRangeExt; + use risingwave_pb::hummock::{InputLevel, PbKeyRange}; use super::*; + use crate::filter_key_extractor::FullKeyFilterKeyExtractor; use crate::hummock::compactor::StateCleanUpCompactionFilter; use crate::hummock::iterator::test_utils::mock_sstable_store; use crate::hummock::test_utils::delete_range::create_monotonic_events; use crate::hummock::test_utils::{default_builder_opt_for_test, gen_test_sstable_impl}; - use crate::hummock::{DeleteRangeTombstone, Xor16FilterBuilder}; + use crate::hummock::{ + DeleteRangeTombstone, SharedComapctorObjectIdManager, Xor16FilterBuilder, + }; + use crate::opts::StorageOpts; #[tokio::test] async fn test_delete_range_aggregator_with_filter() { let sstable_store = mock_sstable_store(); let kv_pairs = vec![]; - let range_tombstones = vec![ - DeleteRangeTombstone::new_for_test( - TableId::new(1), - b"abc".to_vec(), - b"cde".to_vec(), - 1, - ), - DeleteRangeTombstone::new_for_test( - TableId::new(2), - b"abc".to_vec(), - b"def".to_vec(), - 1, - ), - ]; + let mut sstable_info_1 = gen_test_sstable_impl::( default_builder_opt_for_test(), 1, kv_pairs.clone().into_iter(), - range_tombstones.clone(), + vec![ + DeleteRangeTombstone::new_for_test( + TableId::new(1), + b"abc".to_vec(), + b"ccc".to_vec(), + 2, + ), + DeleteRangeTombstone::new_for_test( + TableId::new(1), + b"ddd".to_vec(), + b"eee".to_vec(), + 2, + ), + ], sstable_store.clone(), CachePolicy::NotFill, ) .await; sstable_info_1.table_ids = vec![1]; - + let tombstone = DeleteRangeTombstone::new_for_test( + TableId::new(2), + b"abc".to_vec(), + b"def".to_vec(), + 1, + ); let mut sstable_info_2 = gen_test_sstable_impl::( default_builder_opt_for_test(), 2, - kv_pairs.into_iter(), - range_tombstones.clone(), + vec![( + FullKey::from_user_key( + UserKey::new(TableId::new(1), TableKey(Bytes::copy_from_slice(b"bbb"))), + 1, + ), + HummockValue::put(Bytes::copy_from_slice(b"value")), + )] + .into_iter(), + vec![tombstone.clone()], sstable_store.clone(), CachePolicy::NotFill, ) .await; - sstable_info_2.table_ids = vec![2]; + sstable_info_2.table_ids = vec![1, 2]; let compact_task = CompactTask { input_ssts: vec![InputLevel { @@ -960,48 +995,55 @@ mod tests { table_infos: vec![sstable_info_1, sstable_info_2], }], existing_table_ids: vec![2], + splits: vec![PbKeyRange::inf()], + watermark: 10, ..Default::default() }; - let mut state_clean_up_filter = StateCleanUpCompactionFilter::new(HashSet::from_iter( + let state_clean_up_filter = StateCleanUpCompactionFilter::new(HashSet::from_iter( compact_task.existing_table_ids.clone(), )); - - let sstable_infos = compact_task - .input_ssts - .iter() - .flat_map(|level| level.table_infos.iter()) - .filter(|table_info| { - let table_ids = &table_info.table_ids; - table_ids - .iter() - .any(|table_id| compact_task.existing_table_ids.contains(table_id)) - }) - .cloned() - .collect_vec(); - - let mut iter = ForwardMergeRangeIterator::new(HummockEpoch::MAX); - iter.add_concat_iter(sstable_infos, sstable_store); - - let ret = CompactionDeleteRangeIterator::new(iter) - .get_tombstone_between( - UserKey::::default().as_ref(), - UserKey::::default().as_ref(), + let opts = StorageOpts { + share_buffer_compaction_worker_threads_number: 1, + ..Default::default() + }; + let context = CompactorContext::new_local_compact_context( + Arc::new(opts), + sstable_store.clone(), + Arc::new(CompactorMetrics::unused()), + ); + let runner = CompactorRunner::new( + 0, + context, + compact_task, + Box::new(SharedComapctorObjectIdManager::for_test( + VecDeque::from_iter([5, 6, 7, 8, 9, 10, 11, 12, 13]), + )), + ); + let multi_filter_key_extractor = + Arc::new(FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor)); + let (_, sst_infos, _) = runner + .run( + state_clean_up_filter, + multi_filter_key_extractor, + Arc::new(TaskProgress::default()), ) .await .unwrap(); - let ret = ret - .into_iter() - .filter(|event| { - !state_clean_up_filter.should_delete(FullKey::from_user_key( - event.event_key.left_user_key.as_ref(), - event.new_epoch, - )) - }) - .collect_vec(); + let sst_infos = sst_infos.into_iter().map(|sst| sst.sst_info).collect_vec(); + let mut ret = vec![]; + for sst_info in sst_infos { + let sst = sstable_store + .sstable(&sst_info, &mut StoreLocalStatistic::default()) + .await + .unwrap(); + ret.append(&mut sst.value().meta.monotonic_tombstone_events.clone()); + } + let expected_result = create_monotonic_events(vec![tombstone]); assert_eq!( - ret, - create_monotonic_events(vec![range_tombstones[1].clone()]) + ret, expected_result, + "{:?} vs {:?}", + ret[0], expected_result[0], ); } } diff --git a/src/storage/src/hummock/sstable/builder.rs b/src/storage/src/hummock/sstable/builder.rs index 9202b3ec28788..997937b9aaf6f 100644 --- a/src/storage/src/hummock/sstable/builder.rs +++ b/src/storage/src/hummock/sstable/builder.rs @@ -200,12 +200,13 @@ impl SstableBuilder { } if is_max_epoch(event.new_epoch) && self.monotonic_deletes.last().map_or(true, |last| { - is_max_epoch(last.new_epoch) - && last.event_key.left_user_key.table_id - == event.event_key.left_user_key.table_id + last.event_key.left_user_key.table_id != event.event_key.left_user_key.table_id + || is_max_epoch(last.new_epoch) }) { - // This range would never delete any key so we can merge it with last range. + // There are two case we shall skip the right end of delete-range. + // 1, it belongs the same table-id with the last event, and the last event is also right-end of some delete-range so we can merge them into one point. + // 2, this point does not belong the same table-id with the last event. It means that the left end of this delete-range may be dropped, so we can not add it. return; } if !is_max_epoch(event.new_epoch) {