Skip to content

Commit

Permalink
fix(meta): fix pick one sub-level cause dead-loop (#16069) (#16112)
Browse files Browse the repository at this point in the history
Signed-off-by: Little-Wallace <[email protected]>
Co-authored-by: Wallace <[email protected]>
  • Loading branch information
github-actions[bot] and Little-Wallace authored Apr 3, 2024
1 parent 454489f commit f4c4ab0
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ struct IntraCompactionTaskValidationRule {

impl CompactionTaskValidationRule for IntraCompactionTaskValidationRule {
fn validate(&self, input: &CompactionInput, stats: &mut LocalPickerStatistic) -> bool {
if input.total_file_count >= self.config.level0_max_compact_file_number
if (input.total_file_count >= self.config.level0_max_compact_file_number
&& input.input_levels.len() > 1)
|| input.input_levels.len() >= MAX_COMPACT_LEVEL_COUNT
{
return true;
Expand Down
34 changes: 31 additions & 3 deletions src/meta/src/hummock/compaction/picker/intra_compaction_picker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,10 +348,10 @@ impl WholeLevelCompactionPicker {
let mut total_file_count = 0;
let mut wait_enough = false;
for next_level in l0.sub_levels.iter().skip(idx) {
if select_input_size > max_compaction_bytes
if (select_input_size > max_compaction_bytes
|| total_file_count > self.config.level0_max_compact_file_number
|| (next_level.vnode_partition_count == partition_count
&& select_level_inputs.len() > 1)
|| next_level.vnode_partition_count == partition_count)
&& select_level_inputs.len() > 1
{
wait_enough = true;
break;
Expand Down Expand Up @@ -764,4 +764,32 @@ pub mod tests {
assert!(is_l0_trivial_move(&ret));
assert_eq!(ret.input_levels[0].table_infos.len(), 1);
}
#[test]
fn test_pick_whole_level() {
let config = Arc::new(
CompactionConfigBuilder::new()
.level0_max_compact_file_number(20)
.build(),
);
let mut table_infos = vec![];
for epoch in 1..3 {
let base = epoch * 100;
let mut ssts = vec![];
for i in 1..50 {
let left = (i as usize) * 100;
let right = left + 100;
ssts.push(generate_table(base + i, 1, left, right, epoch));
}
table_infos.push(ssts);
}

let l0 = generate_l0_nonoverlapping_multi_sublevels(table_infos);
let compaction_task_validator = Arc::new(CompactionTaskValidator::new(config.clone()));
let picker = WholeLevelCompactionPicker::new(config, compaction_task_validator);
let level_handler = LevelHandler::new(0);
let ret = picker
.pick_whole_level(&l0, &level_handler, 4, &mut LocalPickerStatistic::default())
.unwrap();
assert_eq!(ret.input_levels.len(), 2);
}
}
7 changes: 6 additions & 1 deletion src/meta/src/hummock/manager/compaction_group_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,12 @@ impl HummockManager {
table_ids: &[StateTableId],
) -> Result<CompactionGroupId> {
let result = self
.move_state_table_to_compaction_group(parent_group_id, table_ids, None, 0)
.move_state_table_to_compaction_group(
parent_group_id,
table_ids,
None,
self.env.opts.partition_vnode_count,
)
.await?;
self.group_to_table_vnode_partition
.write()
Expand Down
113 changes: 113 additions & 0 deletions src/meta/src/hummock/manager/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::borrow::Borrow;
use std::cmp::Ordering;
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;

use itertools::Itertools;
use prometheus::Registry;
Expand Down Expand Up @@ -2188,3 +2189,115 @@ async fn test_gc_stats() {
);
assert_eq_gc_stats(6, 3, 0, 0, 2, 4);
}

#[tokio::test]
async fn test_partition_level() {
let config = CompactionConfigBuilder::new()
.level0_tier_compact_file_number(3)
.level0_sub_level_compact_level_count(3)
.level0_overlapping_sub_level_compact_level_count(3)
.build();
let registry = Registry::new();
let (_env, hummock_manager, _, worker_node) =
setup_compute_env_with_metric(80, config.clone(), Some(MetaMetrics::for_test(&registry)))
.await;
let config = Arc::new(config);

let context_id = worker_node.id;

hummock_manager
.register_table_ids(&[(100, 2)])
.await
.unwrap();
hummock_manager
.register_table_ids(&[(101, 2)])
.await
.unwrap();
let sst_1 = gen_extend_sstable_info(10, 2, 1, vec![100, 101]);
hummock_manager
.commit_epoch(
30,
CommitEpochInfo::for_test(vec![sst_1.clone()], HashMap::from([(10, context_id)])),
)
.await
.unwrap();
// Construct data via manual compaction
let compaction_task = get_manual_compact_task(&hummock_manager, context_id).await;
let base_level: usize = 6;
assert_eq!(compaction_task.input_ssts[0].table_infos.len(), 1);
assert_eq!(compaction_task.target_level, base_level as u32);
assert!(hummock_manager
.report_compact_task(
compaction_task.task_id,
TaskStatus::Success,
vec![
gen_sstable_info(11, 1, vec![100]),
gen_sstable_info(12, 2, vec![101]),
],
None,
)
.await
.unwrap());

hummock_manager
.split_compaction_group(2, &[100])
.await
.unwrap();
let current_version = hummock_manager.get_current_version().await;
let new_group_id = current_version.levels.keys().max().cloned().unwrap();
assert_eq!(
current_version
.get_compaction_group_levels(new_group_id)
.levels[base_level - 1]
.table_infos
.len(),
1
);
let mut global_sst_id = 13;
const MB: u64 = 1024 * 1024;
let mut selector = default_compaction_selector();
for epoch in 31..100 {
let mut sst = gen_extend_sstable_info(global_sst_id, new_group_id, 10, vec![100]);
sst.sst_info.file_size = 10 * MB;
sst.sst_info.uncompressed_file_size = 10 * MB;
hummock_manager
.commit_epoch(
epoch,
CommitEpochInfo::for_test(vec![sst], HashMap::from([(global_sst_id, context_id)])),
)
.await
.unwrap();
global_sst_id += 1;
if let Some(task) = hummock_manager
.get_compact_task(new_group_id, &mut selector)
.await
.unwrap()
{
let mut sst = gen_sstable_info(global_sst_id, 10, vec![100]);
sst.file_size = task
.input_ssts
.iter()
.map(|level| {
level
.table_infos
.iter()
.map(|sst| sst.file_size)
.sum::<u64>()
})
.sum::<u64>();
global_sst_id += 1;
let ret = hummock_manager
.report_compact_task(task.task_id, TaskStatus::Success, vec![sst], None)
.await
.unwrap();
assert!(ret);
}
}
let current_version = hummock_manager.get_current_version().await;
let group = current_version.get_compaction_group_levels(new_group_id);
for sub_level in &group.l0.as_ref().unwrap().sub_levels {
if sub_level.total_file_size > config.sub_level_max_compaction_bytes {
assert!(sub_level.vnode_partition_count > 0);
}
}
}

0 comments on commit f4c4ab0

Please sign in to comment.