Skip to content

Commit

Permalink
fix(compaction): fix can not pick task when there are many files over…
Browse files Browse the repository at this point in the history
…lap (#12790)

Signed-off-by: Little-Wallace <[email protected]>
  • Loading branch information
Little-Wallace authored Oct 13, 2023
1 parent 369c1f3 commit c9840ed
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 79 deletions.
49 changes: 0 additions & 49 deletions src/meta/src/hummock/compaction/picker/intra_compaction_picker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -623,53 +623,4 @@ pub mod tests {
assert!(is_l0_trivial_move(&ret));
assert_eq!(ret.input_levels[0].table_infos.len(), 1);
}

#[test]
fn test_issue_11154() {
let mut local_stats = LocalPickerStatistic::default();
let mut l0 = generate_l0_overlapping_sublevels(vec![
vec![
generate_table(4, 1, 1, 200, 1),
generate_table(5, 1, 400, 600, 1),
],
vec![
generate_table(6, 1, 1, 200, 1),
generate_table(7, 1, 400, 600, 1),
],
vec![
generate_table(8, 1, 1, 200, 1),
generate_table(9, 1, 400, 600, 1),
],
vec![generate_table(10, 1, 1, 600, 1)],
]);
// We can set level_type only because the input above is valid.
for s in &mut l0.sub_levels {
s.level_type = LevelType::Nonoverlapping as i32;
}
let levels = Levels {
l0: Some(l0),
levels: vec![generate_level(1, vec![generate_table(3, 1, 0, 100000, 1)])],
member_table_ids: vec![1],
..Default::default()
};
let levels_handler = vec![LevelHandler::new(0), LevelHandler::new(1)];

// Pick with large max_compaction_bytes results all sub levels included in input.
let config = Arc::new(
CompactionConfigBuilder::new()
.max_compaction_bytes(800)
.sub_level_max_compaction_bytes(50000)
.max_bytes_for_level_base(500000)
.level0_sub_level_compact_level_count(1)
.build(),
);
// Only include sub-level 0 results will violate MAX_WRITE_AMPLIFICATION.
// So all sub-levels are included to make write amplification < MAX_WRITE_AMPLIFICATION.
let mut picker = IntraCompactionPicker::new(config);
let ret = picker
.pick_compaction(&levels, &levels_handler, &mut local_stats)
.unwrap();
// avoid add sst_10 and cause a big task
assert_eq!(3, ret.input_levels.len());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,13 @@ impl NonOverlapSubLevelPicker {
break;
}

// more than 1 sub_level
if ret.total_file_count > 1 && ret.total_file_size >= self.max_compaction_bytes
|| ret.total_file_count >= self.max_file_count as usize
{
break;
}

let mut overlap_files_range =
overlap_info.check_multiple_include(&target_level.table_infos);
if overlap_files_range.is_empty() {
Expand Down Expand Up @@ -288,15 +295,6 @@ impl NonOverlapSubLevelPicker {
.map(|(_, files)| files.len())
.sum::<usize>();

// more than 1 sub_level
if ret.total_file_count > 1
&& (ret.total_file_size + (add_files_size + current_level_size)
>= self.max_compaction_bytes
|| ret.total_file_count + add_files_count >= self.max_file_count as usize)
{
break;
}

if ret
.sstable_infos
.iter()
Expand Down
60 changes: 60 additions & 0 deletions src/storage/src/hummock/compactor/compaction_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,73 @@ pub fn build_multi_compaction_filter(compact_task: &CompactTask) -> MultiCompact
multi_filter
}

const MAX_FILE_COUNT: usize = 32;

fn generate_splits_fast(
sstable_infos: &Vec<SstableInfo>,
compaction_size: u64,
context: CompactorContext,
) -> HummockResult<Vec<KeyRange_vec>> {
let worker_num = context.compaction_executor.worker_num();
let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20;

let parallelism = (compaction_size + parallel_compact_size - 1) / parallel_compact_size;

let parallelism = std::cmp::min(
worker_num,
std::cmp::min(
parallelism as usize,
context.storage_opts.max_sub_compaction as usize,
),
);
let mut indexes = vec![];
for sst in sstable_infos {
let key_range = sst.key_range.as_ref().unwrap();
indexes.push(
FullKey {
user_key: FullKey::decode(&key_range.left).user_key,
epoch: HummockEpoch::MAX,
}
.encode(),
);
indexes.push(
FullKey {
user_key: FullKey::decode(&key_range.right).user_key,
epoch: HummockEpoch::MAX,
}
.encode(),
);
}
indexes.sort_by(|a, b| KeyComparator::compare_encoded_full_key(a.as_ref(), b.as_ref()));
indexes.dedup();
if indexes.len() <= parallelism {
return Ok(vec![]);
}
let mut splits = vec![];
splits.push(KeyRange_vec::new(vec![], vec![]));
let parallel_key_count = indexes.len() / parallelism;
let mut last_split_key_count = 0;
for key in indexes {
if last_split_key_count >= parallel_key_count {
splits.last_mut().unwrap().right = key.clone();
splits.push(KeyRange_vec::new(key.clone(), vec![]));
last_split_key_count = 0;
}
last_split_key_count += 1;
}
Ok(splits)
}

pub async fn generate_splits(
sstable_infos: &Vec<SstableInfo>,
compaction_size: u64,
context: CompactorContext,
) -> HummockResult<Vec<KeyRange_vec>> {
let parallel_compact_size = (context.storage_opts.parallel_compact_size_mb as u64) << 20;
if compaction_size > parallel_compact_size {
if sstable_infos.len() > MAX_FILE_COUNT {
return generate_splits_fast(sstable_infos, compaction_size, context);
}
let mut indexes = vec![];
// preload the meta and get the smallest key to split sub_compaction
for sstable_info in sstable_infos {
Expand Down
43 changes: 22 additions & 21 deletions src/storage/src/hummock/compactor/compactor_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,17 +152,20 @@ impl CompactorRunner {
let mut local_stats = StoreLocalStatistic::default();

for table_info in sstable_infos {
let table = sstable_store.sstable(table_info, &mut local_stats).await?;
let mut range_tombstone_list = table.value().meta.monotonic_tombstone_events.clone();
range_tombstone_list.iter_mut().for_each(|tombstone| {
if filter.should_delete(FullKey::from_user_key(
tombstone.event_key.left_user_key.as_ref(),
tombstone.new_epoch,
)) {
tombstone.new_epoch = HummockEpoch::MAX;
}
});
builder.add_delete_events(range_tombstone_list);
if table_info.range_tombstone_count > 0 {
let table = sstable_store.sstable(table_info, &mut local_stats).await?;
let mut range_tombstone_list =
table.value().meta.monotonic_tombstone_events.clone();
range_tombstone_list.iter_mut().for_each(|tombstone| {
if filter.should_delete(FullKey::from_user_key(
tombstone.event_key.left_user_key.as_ref(),
tombstone.new_epoch,
)) {
tombstone.new_epoch = HummockEpoch::MAX;
}
});
builder.add_delete_events(range_tombstone_list);
}
}

let aggregator = builder.build_for_compaction();
Expand Down Expand Up @@ -891,10 +894,8 @@ mod tests {
use super::*;
use crate::hummock::compactor::StateCleanUpCompactionFilter;
use crate::hummock::iterator::test_utils::mock_sstable_store;
use crate::hummock::test_utils::{
default_builder_opt_for_test, gen_test_sstable_with_range_tombstone,
};
use crate::hummock::{create_monotonic_events, DeleteRangeTombstone};
use crate::hummock::test_utils::{default_builder_opt_for_test, gen_test_sstable_impl};
use crate::hummock::{create_monotonic_events, DeleteRangeTombstone, Xor16FilterBuilder};

#[tokio::test]
async fn test_delete_range_aggregator_with_filter() {
Expand All @@ -914,26 +915,26 @@ mod tests {
1,
),
];
let mut sstable_info_1 = gen_test_sstable_with_range_tombstone(
let mut sstable_info_1 = gen_test_sstable_impl::<Bytes, Xor16FilterBuilder>(
default_builder_opt_for_test(),
1,
kv_pairs.clone().into_iter(),
range_tombstones.clone(),
sstable_store.clone(),
CachePolicy::NotFill,
)
.await
.get_sstable_info();
.await;
sstable_info_1.table_ids = vec![1];

let mut sstable_info_2 = gen_test_sstable_with_range_tombstone(
let mut sstable_info_2 = gen_test_sstable_impl::<Bytes, Xor16FilterBuilder>(
default_builder_opt_for_test(),
2,
kv_pairs.into_iter(),
range_tombstones.clone(),
sstable_store.clone(),
CachePolicy::NotFill,
)
.await
.get_sstable_info();
.await;
sstable_info_2.table_ids = vec![2];

let compact_task = CompactTask {
Expand Down

0 comments on commit c9840ed

Please sign in to comment.