diff --git a/src/meta/src/hummock/compaction/picker/mod.rs b/src/meta/src/hummock/compaction/picker/mod.rs index 15e7a61f548ee..cf3a4555e18e1 100644 --- a/src/meta/src/hummock/compaction/picker/mod.rs +++ b/src/meta/src/hummock/compaction/picker/mod.rs @@ -43,7 +43,7 @@ use crate::hummock::level_handler::LevelHandler; pub const MAX_COMPACT_LEVEL_COUNT: usize = 42; -#[derive(Default)] +#[derive(Default, Debug)] pub struct LocalPickerStatistic { pub skip_by_write_amp_limit: u64, pub skip_by_count_limit: u64, diff --git a/src/meta/src/hummock/compaction/picker/tier_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/tier_compaction_picker.rs index 99b17694f528e..5b3058317a4b0 100644 --- a/src/meta/src/hummock/compaction/picker/tier_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/tier_compaction_picker.rs @@ -14,8 +14,6 @@ use std::sync::Arc; -use risingwave_hummock_sdk::can_concat; -use risingwave_hummock_sdk::prost_key_range::KeyRangeExt; use risingwave_pb::hummock::hummock_version::Levels; use risingwave_pb::hummock::{CompactionConfig, InputLevel, LevelType, OverlappingLevel}; @@ -69,33 +67,11 @@ impl TierCompactionPicker { continue; } - let mut input_level = InputLevel { + let input_level = InputLevel { level_idx: 0, level_type: level.level_type, table_infos: level.table_infos.clone(), }; - // Since the level is overlapping, we can change the order of origin sstable infos in - // task. - input_level.table_infos.sort_by(|sst1, sst2| { - let a = sst1.key_range.as_ref().unwrap(); - let b = sst2.key_range.as_ref().unwrap(); - a.compare(b) - }); - - if can_concat(&input_level.table_infos) { - return Some(CompactionInput { - select_input_size: input_level - .table_infos - .iter() - .map(|sst| sst.file_size) - .sum(), - total_file_count: input_level.table_infos.len() as u64, - input_levels: vec![input_level], - target_level: 0, - target_sub_level_id: level.sub_level_id, - ..Default::default() - }); - } let mut select_level_inputs = vec![input_level]; @@ -182,7 +158,6 @@ impl CompactionPicker for TierCompactionPicker { pub mod tests { use std::sync::Arc; - use risingwave_hummock_sdk::can_concat; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::new_sub_level; use risingwave_pb::hummock::hummock_version::Levels; use risingwave_pb::hummock::{LevelType, OverlappingLevel}; @@ -281,11 +256,8 @@ pub mod tests { // sub-level 0 is excluded because it's nonoverlapping and violating // sub_level_max_compaction_bytes. let mut picker = TierCompactionPicker::new(config); - let ret = picker - .pick_compaction(&levels, &levels_handler, &mut local_stats) - .unwrap(); - assert_eq!(ret.input_levels.len(), 1); - assert!(can_concat(&ret.input_levels[0].table_infos)); + let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats); + assert!(ret.is_none()) } #[test] diff --git a/src/meta/src/rpc/metrics.rs b/src/meta/src/rpc/metrics.rs index c19c06d9ab2cd..1518495df0f7c 100644 --- a/src/meta/src/rpc/metrics.rs +++ b/src/meta/src/rpc/metrics.rs @@ -552,7 +552,7 @@ impl MetaMetrics { let opts = histogram_opts!( "storage_compact_task_size", "Total size of compact that have been issued to state store", - exponential_buckets(4096.0, 1.6, 28).unwrap() + exponential_buckets(1048576.0, 2.0, 16).unwrap() ); let compact_task_size = diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index ca5418cbe6a0f..5864fa9c0a484 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -422,67 +422,54 @@ pub(crate) mod tests { .await; // 2. get compact task - let mut compact_task = hummock_manager_ref + + // 3. compact + while let Some(compact_task) = hummock_manager_ref .get_compact_task( StaticCompactionGroupId::StateDefault.into(), &mut default_level_selector(), ) .await .unwrap() - .unwrap(); - let compaction_filter_flag = CompactionFilterFlag::NONE; - compact_task.compaction_filter_mask = compaction_filter_flag.bits(); - compact_task.current_epoch_time = 0; - - // assert compact_task - assert_eq!( - compact_task - .input_ssts - .iter() - .map(|level| level.table_infos.len()) - .sum::(), - SST_COUNT as usize / 2 + 1, - ); - compact_task.target_level = 6; - - // 3. compact - let (_tx, rx) = tokio::sync::oneshot::channel(); - let (mut result_task, task_stats) = compact( - compact_ctx, - compact_task.clone(), - rx, - Box::new(sstable_object_id_manager.clone()), - ) - .await; + { + // 3. compact + let (_tx, rx) = tokio::sync::oneshot::channel(); + let (mut result_task, task_stats) = compact( + compact_ctx.clone(), + compact_task.clone(), + rx, + Box::new(sstable_object_id_manager.clone()), + ) + .await; - hummock_manager_ref - .report_compact_task(&mut result_task, Some(to_prost_table_stats_map(task_stats))) - .await - .unwrap(); + hummock_manager_ref + .report_compact_task(&mut result_task, Some(to_prost_table_stats_map(task_stats))) + .await + .unwrap(); + } // 4. get the latest version and check let version = hummock_manager_ref.get_current_version().await; - let output_table = version + let output_tables = version .get_compaction_group_levels(StaticCompactionGroupId::StateDefault.into()) .levels - .last() - .unwrap() - .table_infos - .first() - .unwrap(); - let table = storage - .sstable_store() - .sstable(output_table, &mut StoreLocalStatistic::default()) - .await - .unwrap(); - let target_table_size = storage.storage_opts().sstable_size_mb * (1 << 20); - - assert!( - table.value().meta.estimated_size > target_table_size, - "table.meta.estimated_size {} <= target_table_size {}", - table.value().meta.estimated_size, - target_table_size - ); + .iter() + .flat_map(|level| level.table_infos.clone()) + .collect_vec(); + for output_table in &output_tables { + let table = storage + .sstable_store() + .sstable(output_table, &mut StoreLocalStatistic::default()) + .await + .unwrap(); + let target_table_size = storage.storage_opts().sstable_size_mb * (1 << 20); + assert!( + table.value().meta.estimated_size > target_table_size, + "table.meta.estimated_size {} <= target_table_size {}", + table.value().meta.estimated_size, + target_table_size + ); + } // 5. storage get back the correct kv after compaction storage.wait_version(version).await;