Skip to content

Commit

Permalink
drop delete range by compaction filter
Browse files Browse the repository at this point in the history
Signed-off-by: Little-Wallace <[email protected]>
  • Loading branch information
Little-Wallace committed Jan 10, 2024
1 parent bfa1fbb commit 2b064c0
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 52 deletions.
128 changes: 79 additions & 49 deletions src/storage/src/hummock/compactor/compactor_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,10 @@ where
if !task_config.gc_delete_keys
&& del_iter.is_valid()
&& !is_max_epoch(del_iter.earliest_epoch())
&& !compaction_filter.should_delete(FullKey::from_user_key(
full_key.user_key,
del_iter.earliest_epoch(),
))
{
sst_builder
.add_monotonic_delete(MonotonicDeleteEvent {
Expand Down Expand Up @@ -799,10 +803,16 @@ where
while del_iter.is_valid() && del_iter.key().as_ref().le(&target_extended_user_key) {
let event_key = del_iter.key().to_vec();
del_iter.next().await?;
if !task_config.gc_delete_keys {
let new_epoch = del_iter.earliest_epoch();
if !task_config.gc_delete_keys
&& !compaction_filter.should_delete(FullKey::from_user_key(
event_key.left_user_key.as_ref(),
new_epoch,
))
{
sst_builder
.add_monotonic_delete(MonotonicDeleteEvent {
new_epoch: del_iter.earliest_epoch(),
new_epoch,
event_key,
})
.await?;
Expand Down Expand Up @@ -876,10 +886,12 @@ where
// delete key to represent this.
iter_key.epoch_with_gap =
EpochWithGap::new_from_epoch(earliest_range_delete_which_can_see_iter_key);
sst_builder
.add_full_key(iter_key, HummockValue::Delete, is_new_user_key)
.verbose_instrument_await("add_full_key_delete")
.await?;
if !compaction_filter.should_delete(iter_key) {
sst_builder
.add_full_key(iter_key, HummockValue::Delete, is_new_user_key)
.verbose_instrument_await("add_full_key_delete")
.await?;
}
last_table_stats.total_key_count += 1;
last_table_stats.total_key_size += iter_key.encoded_len() as i64;
last_table_stats.total_value_size += 1;
Expand All @@ -902,6 +914,7 @@ where
let end_key_ref = extended_largest_user_key.as_ref();
while del_iter.is_valid() {
if !end_key_ref.is_empty() && del_iter.key().ge(&end_key_ref) {
// We do not need to check right bound of delete-range because build would not add it.
sst_builder
.add_monotonic_delete(MonotonicDeleteEvent {
event_key: extended_largest_user_key,
Expand All @@ -912,12 +925,20 @@ where
}
let event_key = del_iter.key().to_vec();
del_iter.next().await?;
sst_builder
.add_monotonic_delete(MonotonicDeleteEvent {
new_epoch: del_iter.earliest_epoch(),
event_key,
})
.await?;
let new_epoch = del_iter.earliest_epoch();
let drop = compaction_filter.should_delete(FullKey::from_user_key(
event_key.left_user_key.as_ref(),
new_epoch,
));
if !drop {
sst_builder
.add_monotonic_delete(MonotonicDeleteEvent {
new_epoch,
event_key,
})
.await?;
}

progress_key_num += 1;
if let Some(task_progress) = task_progress.as_ref()
&& progress_key_num >= PROGRESS_KEY_INTERVAL
Expand Down Expand Up @@ -947,18 +968,22 @@ where

#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::collections::{HashSet, VecDeque};

use risingwave_common::catalog::TableId;
use risingwave_hummock_sdk::key::UserKey;
use risingwave_pb::hummock::InputLevel;
use risingwave_hummock_sdk::prost_key_range::KeyRangeExt;
use risingwave_pb::hummock::{InputLevel, PbKeyRange};

use super::*;
use crate::filter_key_extractor::FullKeyFilterKeyExtractor;
use crate::hummock::compactor::StateCleanUpCompactionFilter;
use crate::hummock::iterator::test_utils::mock_sstable_store;
use crate::hummock::test_utils::delete_range::create_monotonic_events;
use crate::hummock::test_utils::{default_builder_opt_for_test, gen_test_sstable_impl};
use crate::hummock::{DeleteRangeTombstone, Xor16FilterBuilder};
use crate::hummock::{
DeleteRangeTombstone, SharedComapctorObjectIdManager, Xor16FilterBuilder,
};
use crate::opts::StorageOpts;

#[tokio::test]
async fn test_delete_range_aggregator_with_filter() {
Expand Down Expand Up @@ -1007,48 +1032,53 @@ mod tests {
table_infos: vec![sstable_info_1, sstable_info_2],
}],
existing_table_ids: vec![2],
splits: vec![PbKeyRange::inf()],
watermark: 10,
..Default::default()
};
let mut state_clean_up_filter = StateCleanUpCompactionFilter::new(HashSet::from_iter(
let state_clean_up_filter = StateCleanUpCompactionFilter::new(HashSet::from_iter(
compact_task.existing_table_ids.clone(),
));

let sstable_infos = compact_task
.input_ssts
.iter()
.flat_map(|level| level.table_infos.iter())
.filter(|table_info| {
let table_ids = &table_info.table_ids;
table_ids
.iter()
.any(|table_id| compact_task.existing_table_ids.contains(table_id))
})
.cloned()
.collect_vec();

let mut iter = ForwardMergeRangeIterator::new(HummockEpoch::MAX);
iter.add_concat_iter(sstable_infos, sstable_store);

let ret = CompactionDeleteRangeIterator::new(iter)
.get_tombstone_between(
UserKey::<Bytes>::default().as_ref(),
UserKey::<Bytes>::default().as_ref(),
let mut opts = StorageOpts::default();
opts.share_buffer_compaction_worker_threads_number = 1;
let context = CompactorContext::new_local_compact_context(
Arc::new(opts),
sstable_store.clone(),
Arc::new(CompactorMetrics::unused()),
);
let runner = CompactorRunner::new(
0,
context,
compact_task,
Box::new(SharedComapctorObjectIdManager::for_test(
VecDeque::from_iter([5, 6, 7, 8, 9, 10, 11, 12, 13]),
)),
);
let multi_filter_key_extractor =
Arc::new(FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor));
let (_, sst_infos, _) = runner
.run(
state_clean_up_filter,
multi_filter_key_extractor,
Arc::new(TaskProgress::default()),
)
.await
.unwrap();
let ret = ret
.into_iter()
.filter(|event| {
!state_clean_up_filter.should_delete(FullKey::from_user_key(
event.event_key.left_user_key.as_ref(),
event.new_epoch,
))
})
.collect_vec();
let sst_infos = sst_infos.into_iter().map(|sst| sst.sst_info).collect_vec();
let mut ret = vec![];
for sst_info in sst_infos {
let sst = sstable_store
.sstable(&sst_info, &mut StoreLocalStatistic::default())
.await
.unwrap();
ret.append(&mut sst.value().meta.monotonic_tombstone_events.clone());
}
let expected_result = create_monotonic_events(vec![range_tombstones[1].clone()]);

assert_eq!(
ret,
create_monotonic_events(vec![range_tombstones[1].clone()])
ret, expected_result,
"{:?} vs {:?}",
ret[0], expected_result[0],
);
}
}
5 changes: 2 additions & 3 deletions src/storage/src/hummock/sstable/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,8 @@ impl<W: SstableWriter, F: FilterBuilder> SstableBuilder<W, F> {
}
if is_max_epoch(event.new_epoch)
&& self.monotonic_deletes.last().map_or(true, |last| {
is_max_epoch(last.new_epoch)
&& last.event_key.left_user_key.table_id
== event.event_key.left_user_key.table_id
last.event_key.left_user_key.table_id != event.event_key.left_user_key.table_id
|| is_max_epoch(last.new_epoch)
})
{
// This range would never delete any key so we can merge it with last range.
Expand Down
2 changes: 2 additions & 0 deletions src/storage/src/hummock/sstable/multi_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ where

/// Add kv pair to sstable.
pub async fn add_monotonic_delete(&mut self, event: MonotonicDeleteEvent) -> HummockResult<()> {
println!("add event key: {:?}, ", event);
if let Some(builder) = self.current_builder.as_mut()
&& builder.reach_capacity()
&& !is_max_epoch(event.new_epoch)
Expand All @@ -311,6 +312,7 @@ where

if self.current_builder.is_none() {
if is_max_epoch(event.new_epoch) {
println!("skip event key: {:?}, ", event);
return Ok(());
}

Expand Down

0 comments on commit 2b064c0

Please sign in to comment.