diff --git a/src/meta/src/hummock/compaction/level_selector.rs b/src/meta/src/hummock/compaction/level_selector.rs index 54e64b5089ac5..fdd7c8d10377d 100644 --- a/src/meta/src/hummock/compaction/level_selector.rs +++ b/src/meta/src/hummock/compaction/level_selector.rs @@ -241,7 +241,7 @@ impl DynamicLevelSelectorCore { .count() as u64; let non_overlapping_level_score = non_overlapping_level_count * SCORE_BASE / std::cmp::max( - base_level_sst_count / 16, + base_level_sst_count / 8, self.config.level0_sub_level_compact_level_count as u64, ); diff --git a/src/meta/src/hummock/compaction/mod.rs b/src/meta/src/hummock/compaction/mod.rs index 1e0b42e4c70d8..b84fb975bb6aa 100644 --- a/src/meta/src/hummock/compaction/mod.rs +++ b/src/meta/src/hummock/compaction/mod.rs @@ -281,12 +281,6 @@ pub fn create_compaction_task( ) -> CompactionTask { let target_file_size = if input.target_level == 0 { compaction_config.target_file_size_base - } else if input.target_level == base_level { - // This is just a temporary optimization measure. We hope to reduce the size of SST as much - // as possible to reduce the amount of data blocked by a single task during compaction, - // but too many files will increase computing overhead. - // TODO: remove it after can reduce configuration `target_file_size_base`. - compaction_config.target_file_size_base / 4 } else { assert!(input.target_level >= base_level); let step = (input.target_level - base_level) / 2; diff --git a/src/meta/src/hummock/compaction/overlap_strategy.rs b/src/meta/src/hummock/compaction/overlap_strategy.rs index a94efd0ca9888..6c5e31a03947e 100644 --- a/src/meta/src/hummock/compaction/overlap_strategy.rs +++ b/src/meta/src/hummock/compaction/overlap_strategy.rs @@ -24,7 +24,11 @@ pub trait OverlapInfo { fn check_overlap(&self, a: &SstableInfo) -> bool; fn check_multiple_overlap(&self, others: &[SstableInfo]) -> Range; fn check_multiple_include(&self, others: &[SstableInfo]) -> Range; - fn update(&mut self, table: &SstableInfo); + fn update(&mut self, table: &SstableInfo) { + let other = table.key_range.as_ref().unwrap(); + self.update_key_range(other); + } + fn update_key_range(&mut self, table: &KeyRange); } pub trait OverlapStrategy: Send + Sync { @@ -45,6 +49,7 @@ pub trait OverlapStrategy: Send + Sync { others[range].to_vec() } } + fn check_overlap_with_tables( &self, tables: &[SstableInfo], @@ -112,19 +117,25 @@ impl OverlapInfo for RangeOverlapInfo { fn check_multiple_include(&self, others: &[SstableInfo]) -> Range { match self.target_range.as_ref() { Some(key_range) => { - let overlap_begin = others.partition_point(|table_status| { - KeyComparator::compare_encoded_full_key( - &table_status.key_range.as_ref().unwrap().left, - &key_range.left, - ) == cmp::Ordering::Less + let overlap_begin = others.partition_point(|sst| { + let ord = if key_range.left.is_empty() { + cmp::Ordering::Greater + } else { + KeyComparator::compare_encoded_full_key( + &sst.key_range.as_ref().unwrap().left, + &key_range.left, + ) + }; + ord == cmp::Ordering::Less }); if overlap_begin >= others.len() { return overlap_begin..overlap_begin; } let mut overlap_end = overlap_begin; - for table in &others[overlap_begin..] { - if key_range.compare_right_with(&table.key_range.as_ref().unwrap().right) - == cmp::Ordering::Less + for sst in &others[overlap_begin..] { + if !key_range.right.is_empty() + && key_range.compare_right_with(&sst.key_range.as_ref().unwrap().right) + == cmp::Ordering::Less { break; } @@ -136,8 +147,7 @@ impl OverlapInfo for RangeOverlapInfo { } } - fn update(&mut self, table: &SstableInfo) { - let other = table.key_range.as_ref().unwrap(); + fn update_key_range(&mut self, other: &KeyRange) { if let Some(range) = self.target_range.as_mut() { range.full_key_extend(other); return; diff --git a/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs index a15fab694ee86..f6fe2ac0e6515 100644 --- a/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs @@ -22,6 +22,7 @@ use risingwave_pb::hummock::{CompactionConfig, InputLevel, Level, LevelType, Ove use super::min_overlap_compaction_picker::NonOverlapSubLevelPicker; use super::{CompactionInput, CompactionPicker, LocalPickerStatistic}; use crate::hummock::compaction::create_overlap_strategy; +use crate::hummock::compaction::picker::include_sst_picker::L0IncludeSstPicker; use crate::hummock::compaction::picker::TrivialMovePicker; use crate::hummock::level_handler::LevelHandler; @@ -48,14 +49,6 @@ impl CompactionPicker for LevelCompactionPicker { return None; } - let is_l0_pending_compact = - level_handlers[0].is_level_all_pending_compact(&l0.sub_levels[0]); - - if is_l0_pending_compact { - stats.skip_by_pending_files += 1; - return None; - } - if let Some(ret) = self.pick_base_trivial_move( l0, levels.get_level(self.target_level), @@ -155,9 +148,14 @@ impl LevelCompactionPicker { return None; } + let max_target_level_size = std::cmp::min( + self.config.max_compaction_bytes / 8, + self.config.sub_level_max_compaction_bytes, + ); let mut skip_by_pending = false; let mut input_levels = vec![]; let mut min_write_amp_meet = false; + let mut exist_small_task = false; for input in l0_select_tables_vec { let l0_select_tables = input .sstable_infos @@ -189,11 +187,67 @@ impl LevelCompactionPicker { if target_level_size > self.config.max_compaction_bytes && strict_check { continue; } + let mut min_overlap_meet = false; - if input.total_file_size >= target_level_size { + if target_level_size <= input.total_file_size { min_write_amp_meet = true; + if target_level_size <= max_target_level_size { + min_overlap_meet = true; + } + } + if !min_overlap_meet { + let mut expand_overlap_info = overlap_strategy.create_overlap_info(); + let mut overlap_info = overlap_strategy.create_overlap_info(); + let mut total_file_size = 0; + let mut target_level_files = vec![]; + for next_sst in &target_level_ssts { + if total_file_size > 0 + && total_file_size + next_sst.file_size > max_target_level_size + { + break; + } + if level_handlers[self.target_level].is_pending_compact(&next_sst.sst_id) { + break; + } + total_file_size += next_sst.file_size; + target_level_files.push(next_sst.clone()); + let key_range = next_sst.key_range.as_ref().unwrap(); + overlap_info.update_key_range(key_range); + let is_first = + next_sst.sst_id == target_level.table_infos.first().unwrap().sst_id; + let is_end = next_sst.sst_id == target_level.table_infos.last().unwrap().sst_id; + if is_first { + let mut key_range = key_range.clone(); + key_range.left.clear(); + expand_overlap_info.update_key_range(&key_range); + } else if is_end { + let mut key_range = key_range.clone(); + key_range.right.clear(); + expand_overlap_info.update_key_range(&key_range); + } else { + expand_overlap_info.update_key_range(key_range); + } + } + if !target_level_files.is_empty() { + let picker = L0IncludeSstPicker::new( + overlap_strategy.clone(), + self.config.max_compaction_bytes / 2, + self.config.level0_max_compact_file_number / 2, + ); + let input = picker.pick_tables( + expand_overlap_info.as_ref(), + overlap_info.as_ref(), + &l0.sub_levels, + &level_handlers[0], + ); + + if input.total_file_size > total_file_size && input.sstable_infos.len() > 1 { + min_write_amp_meet = true; + exist_small_task = true; + input_levels.push((input, total_file_size, target_level_files)); + } + } } - input_levels.push((input, target_level_size, target_level_ssts)); } @@ -203,6 +257,13 @@ impl LevelCompactionPicker { } return None; } + input_levels.sort_by(|(a, _, _), (b, _, _)| { + b.sstable_infos + .len() + .cmp(&a.sstable_infos.len()) + .then_with(|| a.total_file_count.cmp(&b.total_file_count)) + .then_with(|| a.total_file_size.cmp(&b.total_file_size)) + }); if !min_write_amp_meet && strict_check { // If the write-amplification of all candidate task are large, we may hope to wait base @@ -214,7 +275,11 @@ impl LevelCompactionPicker { } for (input, target_file_size, target_level_files) in input_levels { - if min_write_amp_meet && input.total_file_size < target_file_size { + if min_write_amp_meet && target_file_size > input.total_file_size { + continue; + } + + if exist_small_task && target_file_size > max_target_level_size { continue; } @@ -355,8 +420,11 @@ impl LevelCompactionPicker { level_handlers: &[LevelHandler], stats: &mut LocalPickerStatistic, ) -> Option { + if l0.sub_levels.is_empty() { + return None; + } let overlap_strategy = create_overlap_strategy(self.config.compaction_mode()); - + // only pick tables for trivial move for (idx, level) in l0.sub_levels.iter().enumerate() { if level.level_type == LevelType::Overlapping as i32 || idx + 1 >= l0.sub_levels.len() { continue; @@ -442,7 +510,7 @@ pub mod tests { let config = Arc::new( CompactionConfigBuilder::new() .level0_tier_compact_file_number(2) - .level0_sub_level_compact_level_count(1) + .level0_sub_level_compact_level_count(2) .build(), ); LevelCompactionPicker::new(1, config) @@ -450,7 +518,15 @@ pub mod tests { #[test] fn test_compact_l0_to_l1() { - let mut picker = create_compaction_picker_for_test(); + let config = Arc::new( + CompactionConfigBuilder::new() + .level0_tier_compact_file_number(2) + .level0_sub_level_compact_level_count(2) + .max_bytes_for_level_base(200) + .sub_level_max_compaction_bytes(150) + .build(), + ); + let mut picker = LevelCompactionPicker::new(1, config); let l0 = generate_level( 0, vec![ @@ -518,11 +594,10 @@ pub mod tests { ret.add_pending_task(1, &mut levels_handler); let mut local_stats = LocalPickerStatistic::default(); - // Cannot pick because no idle table in sub-level[0]. (And sub-level[0] is pending - // actually). push_table_level0_overlapping(&mut levels, generate_table(8, 1, 199, 233, 3)); - let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats); - assert!(ret.is_none()); + let _ = picker + .pick_compaction(&levels, &levels_handler, &mut local_stats) + .unwrap(); // Don't pick overlapping sub-level 8 levels_handler[0].remove_task(1); @@ -535,6 +610,7 @@ pub mod tests { assert_eq!(ret.input_levels[1].table_infos[0].get_sst_id(), 5); assert_eq!(ret.input_levels[2].table_infos.len(), 2); } + #[test] fn test_selecting_key_range_overlap() { // When picking L0->L1, all L1 files overlapped with selecting_key_range should be picked. @@ -543,6 +619,9 @@ pub mod tests { .level0_tier_compact_file_number(2) .compaction_mode(CompactionMode::Range as i32) .level0_sub_level_compact_level_count(1) + .max_bytes_for_level_base(500) + .sub_level_max_compaction_bytes(100) + .max_compaction_bytes(1000) .build(), ); let mut picker = LevelCompactionPicker::new(1, config); @@ -664,15 +743,16 @@ pub mod tests { // When picking L0->L0, L0's selecting_key_range should not be overlapped with L0's // compacting_key_range. let mut picker = create_compaction_picker_for_test(); + let sst = generate_table(3, 1, 200, 300, 2); let mut levels = Levels { levels: vec![Level { level_idx: 1, level_type: LevelType::Nonoverlapping as i32, - table_infos: vec![generate_table(3, 1, 200, 300, 2)], - total_file_size: 0, + total_file_size: sst.file_size, + uncompressed_file_size: sst.uncompressed_file_size, + table_infos: vec![sst], sub_level_id: 0, - uncompressed_file_size: 0, }], l0: Some(generate_l0_nonoverlapping_sublevels(vec![ generate_table(1, 1, 100, 210, 2), @@ -769,10 +849,10 @@ pub mod tests { // Pick with large max_compaction_bytes results all sub levels included in input. let config = Arc::new( CompactionConfigBuilder::new() - .max_compaction_bytes(500000) - .sub_level_max_compaction_bytes(50000) + .max_compaction_bytes(1000000) + .sub_level_max_compaction_bytes(120000) .max_bytes_for_level_base(500000) - .level0_sub_level_compact_level_count(1) + .level0_sub_level_compact_level_count(2) .build(), ); // Only include sub-level 0 results will violate MAX_WRITE_AMPLIFICATION. @@ -806,11 +886,11 @@ pub mod tests { let ret = picker .pick_compaction(&levels, &levels_handler, &mut local_stats) .unwrap(); - assert_eq!(ret.input_levels[0].table_infos[0].get_sst_id(), 6); assert_eq!( 2, ret.input_levels.iter().filter(|l| l.level_idx == 0).count() ); + assert_eq!(ret.input_levels[0].table_infos[0].get_sst_id(), 6); assert_eq!( 3, ret.input_levels @@ -936,14 +1016,11 @@ pub mod tests { #[test] fn test_l0_to_base_when_all_base_pending() { let l0 = generate_l0_nonoverlapping_multi_sublevels(vec![ - vec![ - generate_table(4, 1, 10, 90, 1), - generate_table(5, 1, 1000, 2000, 1), - ], + vec![generate_table(4, 1, 10, 90, 1)], vec![generate_table(6, 1, 10, 90, 1)], ]); - let levels = Levels { + let mut levels = Levels { l0: Some(l0), levels: vec![generate_level(1, vec![generate_table(3, 1, 1, 100, 1)])], member_table_ids: vec![1], @@ -964,19 +1041,21 @@ pub mod tests { let ret = picker .pick_compaction(&levels, &levels_handler, &mut local_stats) .unwrap(); - // println!("ret.input_levels: {:?}", ret.input_levels); // 1. trivial_move - assert_eq!(2, ret.input_levels.len()); - assert!(ret.input_levels[1].table_infos.is_empty()); - assert_eq!(5, ret.input_levels[0].table_infos[0].sst_id); + assert_eq!(3, ret.input_levels.len()); + assert_eq!(6, ret.input_levels[0].table_infos[0].sst_id); ret.add_pending_task(0, &mut levels_handler); - + let sst = generate_table(5, 1, 1000, 2000, 1); + levels.l0.as_mut().unwrap().sub_levels[0].total_file_size += sst.file_size; + levels.l0.as_mut().unwrap().sub_levels[0] + .table_infos + .push(sst); let ret = picker .pick_compaction(&levels, &levels_handler, &mut local_stats) .unwrap(); println!("ret.input_levels: {:?}", ret.input_levels); - assert_eq!(3, ret.input_levels.len()); - assert_eq!(6, ret.input_levels[0].table_infos[0].sst_id); + assert_eq!(2, ret.input_levels.len()); + assert_eq!(5, ret.input_levels[0].table_infos[0].sst_id); } #[test] diff --git a/src/meta/src/hummock/compaction/picker/include_sst_picker.rs b/src/meta/src/hummock/compaction/picker/include_sst_picker.rs new file mode 100644 index 0000000000000..9bc6e12006772 --- /dev/null +++ b/src/meta/src/hummock/compaction/picker/include_sst_picker.rs @@ -0,0 +1,101 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use risingwave_pb::hummock::{Level, LevelType}; + +use crate::hummock::compaction::overlap_strategy::{OverlapInfo, OverlapStrategy}; +use crate::hummock::compaction::picker::min_overlap_compaction_picker::SubLevelSstables; +use crate::hummock::level_handler::LevelHandler; +pub const MAX_LEVEL_COUNT: usize = 32; + +pub struct L0IncludeSstPicker { + overlap_strategy: Arc, + max_compact_size: u64, + max_file_count: u64, +} + +impl L0IncludeSstPicker { + pub fn new( + overlap_strategy: Arc, + max_compact_size: u64, + max_file_count: u64, + ) -> Self { + Self { + overlap_strategy, + max_compact_size, + max_file_count, + } + } + + pub fn pick_tables( + &self, + include_info: &dyn OverlapInfo, + overlap_info: &dyn OverlapInfo, + sub_levels: &[Level], + level_handler: &LevelHandler, + ) -> SubLevelSstables { + let mut include_infos: Vec> = vec![]; + let mut ret = SubLevelSstables::default(); + for level in sub_levels { + if level.level_type() == LevelType::Overlapping + || ret.total_file_size > self.max_compact_size + || ret.sstable_infos.len() >= MAX_LEVEL_COUNT + || ret.total_file_count as u64 > self.max_file_count + { + break; + } + let overlap_range = overlap_info.check_multiple_overlap(&level.table_infos); + let mut range = include_info.check_multiple_include(&level.table_infos); + range.start = std::cmp::max(range.start, overlap_range.start); + range.end = std::cmp::min(range.end, overlap_range.end); + for overlap in &include_infos { + let old_range = overlap.check_multiple_include(&level.table_infos); + range.start = std::cmp::max(range.start, old_range.start); + range.end = std::cmp::min(range.end, old_range.end); + } + if range.start >= range.end { + break; + } + for index in range.start..range.end { + if level_handler.is_pending_compact(&level.table_infos[index].sst_id) { + return ret; + } + } + let mut overlap = self.overlap_strategy.create_overlap_info(); + ret.sstable_infos + .push(level.table_infos[range.clone()].to_vec()); + for index in range.start..range.end { + ret.total_file_count += 1; + ret.total_file_size += level.table_infos[index].file_size; + let key_range = level.table_infos[index].key_range.as_ref().unwrap(); + if index > 0 && index + 1 < level.table_infos.len() { + overlap.update_key_range(key_range); + continue; + } + let mut key_range = key_range.clone(); + if index == 0 { + key_range.left.clear(); + } + if index + 1 == level.table_infos.len() { + key_range.right.clear(); + } + overlap.update_key_range(&key_range); + } + include_infos.push(overlap); + } + ret + } +} diff --git a/src/meta/src/hummock/compaction/picker/mod.rs b/src/meta/src/hummock/compaction/picker/mod.rs index 7c170786a0f0d..972d4c05707f6 100644 --- a/src/meta/src/hummock/compaction/picker/mod.rs +++ b/src/meta/src/hummock/compaction/picker/mod.rs @@ -13,6 +13,7 @@ // limitations under the License. mod base_level_compaction_picker; +mod include_sst_picker; mod manual_compaction_picker; mod min_overlap_compaction_picker; mod space_reclaim_compaction_picker; diff --git a/src/meta/src/hummock/manager/compaction_group_manager.rs b/src/meta/src/hummock/manager/compaction_group_manager.rs index 52ae6b6121b85..793d16cd598c8 100644 --- a/src/meta/src/hummock/manager/compaction_group_manager.rs +++ b/src/meta/src/hummock/manager/compaction_group_manager.rs @@ -54,16 +54,13 @@ impl HummockManager { pub(super) async fn build_compaction_group_manager( env: &MetaSrvEnv, ) -> Result> { - let default_config = match env.opts.compaction_config.as_ref() { - None => CompactionConfigBuilder::new().build(), - Some(opt) => CompactionConfigBuilder::with_opt(opt).build(), - }; - Self::build_compaction_group_manager_with_config(env, default_config).await + let opts = env.opts.compaction_config.clone(); + Self::build_compaction_group_manager_with_config(env, opts).await } pub(super) async fn build_compaction_group_manager_with_config( env: &MetaSrvEnv, - default_config: CompactionConfig, + default_config: risingwave_common::config::CompactionConfig, ) -> Result> { let compaction_group_manager = RwLock::new(CompactionGroupManager { compaction_groups: BTreeMap::new(), @@ -554,11 +551,9 @@ impl HummockManager { .generate::<{ IdCategory::CompactionGroup }>() .await?; // The new config will be persisted later. - let mut config = self - .compaction_group_manager - .read() - .await - .default_compaction_config(); + let manager = self.compaction_group_manager.read().await; + let opts = manager.default_compaction_config(); + let mut config = CompactionConfigBuilder::with_opt(opts).build(); config.split_by_state_table = allow_split_by_table; config.split_weight_by_vnode = weight_split_by_vnode; @@ -725,7 +720,7 @@ impl HummockManager { #[derive(Default)] pub(super) struct CompactionGroupManager { compaction_groups: BTreeMap, - default_config: CompactionConfig, + default_config: risingwave_common::config::CompactionConfig, } impl CompactionGroupManager { @@ -765,7 +760,10 @@ impl CompactionGroupManager { if compaction_groups.contains_key(id) { continue; } - let new_entry = CompactionGroup::new(*id, self.default_config.clone()); + let new_entry = CompactionGroup::new( + *id, + CompactionConfigBuilder::with_opt(&self.default_config).build(), + ); compaction_groups.insert(*id, new_entry); } let mut trx = Transaction::default(); @@ -787,8 +785,8 @@ impl CompactionGroupManager { self.compaction_groups.get(&compaction_group_id).cloned() } - pub(super) fn default_compaction_config(&self) -> CompactionConfig { - self.default_config.clone() + pub(super) fn default_compaction_config(&self) -> &risingwave_common::config::CompactionConfig { + &self.default_config } async fn update_compaction_config( diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 043c7c5b84d20..0741b76e9a85f 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -38,9 +38,9 @@ use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{ BranchedSstInfo, HummockLevelsExt, HummockVersionExt, HummockVersionUpdateExt, }; use risingwave_hummock_sdk::{ - version_checkpoint_path, CompactionGroupId, ExtendedSstableInfo, HummockCompactionTaskId, - HummockContextId, HummockEpoch, HummockSstableId, HummockSstableObjectId, HummockVersionId, - SstObjectIdRange, INVALID_VERSION_ID, + append_sstable_info_to_string, version_checkpoint_path, CompactionGroupId, ExtendedSstableInfo, + HummockCompactionTaskId, HummockContextId, HummockEpoch, HummockSstableId, + HummockSstableObjectId, HummockVersionId, SstObjectIdRange, INVALID_VERSION_ID, }; use risingwave_pb::hummock::compact_task::{self, TaskStatus, TaskType}; use risingwave_pb::hummock::group_delta::DeltaType; @@ -287,17 +287,16 @@ where fragment_manager: FragmentManagerRef, metrics: Arc, compactor_manager: CompactorManagerRef, - config: CompactionConfig, + opts: risingwave_common::config::CompactionConfig, compactor_streams_change_tx: UnboundedSender<( u32, Streaming, )>, ) -> HummockManagerRef { use crate::manager::CatalogManager; - let compaction_group_manager = - Self::build_compaction_group_manager_with_config(&env, config) - .await - .unwrap(); + let compaction_group_manager = Self::build_compaction_group_manager_with_config(&env, opts) + .await + .unwrap(); let catalog_manager = Arc::new(CatalogManager::new(env.clone()).await.unwrap()); Self::new_impl( env, @@ -449,11 +448,12 @@ where checkpoint } else { // As no record found in stores, create a initial version. - let default_compaction_config = self - .compaction_group_manager - .read() - .await - .default_compaction_config(); + let default_compaction_config = { + let compaction_group_manager = self.compaction_group_manager.read().await; + let default_compaction = compaction_group_manager.default_compaction_config(); + CompactionConfigBuilder::with_opt(default_compaction).build() + }; + let checkpoint = create_init_version(default_compaction_config); tracing::info!("init hummock version checkpoint"); HummockVersionStats::default() @@ -892,6 +892,21 @@ where compact_task.sorted_output_ssts = compact_task.input_ssts[0].table_infos.clone(); // this task has been finished and `trivial_move_task` does not need to be schedule. compact_task.set_task_status(TaskStatus::Success); + if compact_task.target_level != 0 { + let mut output_info = String::default(); + for level in &compact_task.input_ssts { + for sst in &level.table_infos { + append_sstable_info_to_string(&mut output_info, sst); + } + } + tracing::info!( + "TrivialMove group-{} target level: {}, ssts: {}", + compaction_group_id, + compact_task.target_level, + output_info + ); + } + self.report_compact_task_impl(&mut compact_task, &mut compaction_guard, None) .await?; tracing::debug!( @@ -2833,6 +2848,8 @@ fn init_selectors() -> HashMap> { type CompactionRequestChannelItem = (CompactionGroupId, compact_task::TaskType); use tokio::sync::mpsc::error::SendError; +use crate::hummock::compaction::compaction_config::CompactionConfigBuilder; + #[derive(Debug, Default)] pub struct CompactionState { scheduled: Mutex>, diff --git a/src/meta/src/hummock/test_utils.rs b/src/meta/src/hummock/test_utils.rs index 14e262d40c083..7fcb3fd05e548 100644 --- a/src/meta/src/hummock/test_utils.rs +++ b/src/meta/src/hummock/test_utils.rs @@ -24,12 +24,9 @@ use risingwave_hummock_sdk::{ use risingwave_pb::common::{HostAddress, WorkerNode, WorkerType}; #[cfg(test)] use risingwave_pb::hummock::compact_task::TaskStatus; -use risingwave_pb::hummock::{ - CompactionConfig, HummockSnapshot, HummockVersion, KeyRange, SstableInfo, -}; +use risingwave_pb::hummock::{HummockSnapshot, HummockVersion, KeyRange, SstableInfo}; use risingwave_pb::meta::add_worker_node_request::Property; -use crate::hummock::compaction::compaction_config::CompactionConfigBuilder; #[cfg(test)] use crate::hummock::compaction::default_level_selector; use crate::hummock::{CompactorManager, HummockManager, HummockManagerRef}; @@ -302,7 +299,7 @@ pub fn get_sorted_committed_object_ids( } pub async fn setup_compute_env_with_config( port: i32, - config: CompactionConfig, + opts: risingwave_common::config::CompactionConfig, ) -> ( MetaSrvEnv, HummockManagerRef, @@ -328,7 +325,7 @@ pub async fn setup_compute_env_with_config( fragment_manager, Arc::new(MetaMetrics::new()), compactor_manager, - config, + opts, compactor_streams_change_tx, ) .await; @@ -361,13 +358,14 @@ pub async fn setup_compute_env( ClusterManagerRef, WorkerNode, ) { - let config = CompactionConfigBuilder::new() - .level0_tier_compact_file_number(1) - .level0_max_compact_file_number(130) - .level0_sub_level_compact_level_count(1) - .level0_overlapping_sub_level_compact_level_count(1) - .build(); - setup_compute_env_with_config(port, config).await + let opts = risingwave_common::config::CompactionConfig { + level0_tier_compact_file_number: 1, + level0_max_compact_file_number: 130, + level0_sub_level_compact_level_count: 1, + level0_overlapping_sub_level_compact_level_count: 1, + ..Default::default() + }; + setup_compute_env_with_config(port, opts).await } pub async fn get_sst_ids( diff --git a/src/meta/src/lib.rs b/src/meta/src/lib.rs index b7a77b10d475f..cd70e01af9214 100644 --- a/src/meta/src/lib.rs +++ b/src/meta/src/lib.rs @@ -299,7 +299,7 @@ pub fn start(opts: MetaNodeOpts) -> Pin + Send>> { compaction_task_max_heartbeat_interval_secs: config .meta .compaction_task_max_heartbeat_interval_secs, - compaction_config: Some(config.meta.compaction_config), + compaction_config: config.meta.compaction_config, }, config.system.into_init_system_params(), ) diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index 38adf0ba287b5..92548389517d9 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -156,7 +156,7 @@ pub struct MetaOpts { pub min_table_split_write_throughput: u64, pub compaction_task_max_heartbeat_interval_secs: u64, - pub compaction_config: Option, + pub compaction_config: CompactionConfig, } impl MetaOpts { @@ -194,7 +194,7 @@ impl MetaOpts { do_not_config_object_storage_lifecycle: true, partition_vnode_count: 32, compaction_task_max_heartbeat_interval_secs: 0, - compaction_config: None, + compaction_config: CompactionConfig::default(), } } } diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index 80da4ca57e2b3..28b1ec6a94d4f 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -1035,7 +1035,15 @@ fn level_insert_ssts(operand: &mut Level, insert_table_infos: Vec) if operand.level_type == LevelType::Overlapping as i32 { operand.level_type = LevelType::Nonoverlapping as i32; } - assert!(can_concat(&operand.table_infos)); + assert!( + can_concat(&operand.table_infos), + "output: {:?}", + operand + .table_infos + .iter() + .map(|sst| sst.sst_id) + .collect_vec() + ); } pub fn object_size_map(version: &HummockVersion) -> HashMap { diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index 8662a06ea3b9b..73b5b597ff439 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -32,7 +32,6 @@ pub(crate) mod tests { use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::key::{next_key, TABLE_PREFIX_LEN}; use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map; - use risingwave_meta::hummock::compaction::compaction_config::CompactionConfigBuilder; use risingwave_meta::hummock::compaction::{default_level_selector, ManualCompactionOption}; use risingwave_meta::hummock::test_utils::{ register_table_ids_to_compaction_group, setup_compute_env, setup_compute_env_with_config, @@ -204,13 +203,14 @@ pub(crate) mod tests { #[tokio::test] async fn test_compaction_watermark() { - let config = CompactionConfigBuilder::new() - .level0_tier_compact_file_number(1) - .level0_max_compact_file_number(130) - .level0_overlapping_sub_level_compact_level_count(1) - .build(); + let opts = risingwave_common::config::CompactionConfig { + level0_tier_compact_file_number: 1, + level0_max_compact_file_number: 130, + level0_overlapping_sub_level_compact_level_count: 1, + ..Default::default() + }; let (env, hummock_manager_ref, _cluster_manager_ref, worker_node) = - setup_compute_env_with_config(8080, config).await; + setup_compute_env_with_config(8080, opts).await; let hummock_meta_client: Arc = Arc::new(MockHummockMetaClient::new( hummock_manager_ref.clone(), worker_node.id, @@ -275,7 +275,23 @@ pub(crate) mod tests { }, )]); compact_task.current_epoch_time = 0; - + compact_task.input_ssts = group + .l0 + .as_ref() + .unwrap() + .sub_levels + .iter() + .map(|level| InputLevel { + level_idx: 0, + level_type: level.level_type, + table_infos: level.table_infos.clone(), + }) + .collect(); + compact_task.input_ssts.push(InputLevel { + level_idx: group.levels.last().unwrap().level_idx, + table_infos: group.levels.last().unwrap().table_infos.clone(), + level_type: group.levels.last().unwrap().level_type, + }); let (_tx, rx) = tokio::sync::oneshot::channel(); let (mut result_task, task_stats) = compact(Arc::new(compact_ctx.clone()), compact_task.clone(), rx).await; diff --git a/src/storage/hummock_test/src/sync_point_tests.rs b/src/storage/hummock_test/src/sync_point_tests.rs index 9533119ec3733..eb6117396ef47 100644 --- a/src/storage/hummock_test/src/sync_point_tests.rs +++ b/src/storage/hummock_test/src/sync_point_tests.rs @@ -26,7 +26,6 @@ use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::key::{next_key, user_key}; use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map; use risingwave_hummock_sdk::HummockVersionId; -use risingwave_meta::hummock::compaction::compaction_config::CompactionConfigBuilder; use risingwave_meta::hummock::compaction::{default_level_selector, ManualCompactionOption}; use risingwave_meta::hummock::test_utils::{ add_ssts, register_table_ids_to_compaction_group, setup_compute_env, @@ -263,12 +262,13 @@ pub async fn compact_once( #[cfg(feature = "sync_point")] #[serial] async fn test_syncpoints_get_in_delete_range_boundary() { - let config = CompactionConfigBuilder::new() - .level0_tier_compact_file_number(1) - .max_bytes_for_level_base(4096) - .build(); + let opts = risingwave_common::config::CompactionConfig { + level0_tier_compact_file_number: 1, + max_bytes_for_level_base: 4096, + ..Default::default() + }; let (env, hummock_manager_ref, _cluster_manager_ref, worker_node) = - setup_compute_env_with_config(8080, config).await; + setup_compute_env_with_config(8080, opts).await; let hummock_meta_client: Arc = Arc::new(MockHummockMetaClient::new( hummock_manager_ref.clone(), worker_node.id, diff --git a/src/tests/compaction_test/src/delete_range_runner.rs b/src/tests/compaction_test/src/delete_range_runner.rs index b55c6c90448b5..ac431b214e2d5 100644 --- a/src/tests/compaction_test/src/delete_range_runner.rs +++ b/src/tests/compaction_test/src/delete_range_runner.rs @@ -36,7 +36,7 @@ use risingwave_meta::hummock::MockHummockMetaClient; use risingwave_object_store::object::object_metrics::ObjectStoreMetrics; use risingwave_object_store::object::parse_remote_object_store; use risingwave_pb::catalog::PbTable; -use risingwave_pb::hummock::{CompactionConfig, CompactionGroupInfo}; +use risingwave_pb::hummock::CompactionGroupInfo; use risingwave_pb::meta::SystemParams; use risingwave_rpc_client::HummockMetaClient; use risingwave_storage::filter_key_extractor::{ @@ -88,9 +88,8 @@ pub fn start_delete_range(opts: CompactionTestOpts) -> Pin anyhow::Result<()> { let config = load_config(&opts.config_path, NoOverride); - let compaction_config = CompactionConfigBuilder::new().build(); compaction_test( - compaction_config, + risingwave_common::config::CompactionConfig::default(), config, &opts.state_store, 1000000, @@ -101,15 +100,16 @@ pub async fn compaction_test_main(opts: CompactionTestOpts) -> anyhow::Result<() } async fn compaction_test( - compaction_config: CompactionConfig, + opts: risingwave_common::config::CompactionConfig, config: RwConfig, state_store_type: &str, test_range: u64, test_count: u64, test_delete_ratio: u32, ) -> anyhow::Result<()> { + let compaction_config = CompactionConfigBuilder::with_opt(&opts).build(); let (env, hummock_manager_ref, _cluster_manager_ref, worker_node) = - setup_compute_env_with_config(8080, compaction_config.clone()).await; + setup_compute_env_with_config(8080, opts).await; let meta_client = Arc::new(MockHummockMetaClient::new( hummock_manager_ref.clone(), worker_node.id, @@ -588,20 +588,21 @@ fn run_compactor_thread( mod tests { use risingwave_common::config::RwConfig; - use risingwave_meta::hummock::compaction::compaction_config::CompactionConfigBuilder; use super::compaction_test; #[tokio::test(flavor = "multi_thread", worker_threads = 3)] async fn test_small_data() { let config = RwConfig::default(); - let mut compaction_config = CompactionConfigBuilder::new().build(); - compaction_config.max_sub_compaction = 1; - compaction_config.level0_tier_compact_file_number = 2; - compaction_config.max_bytes_for_level_base = 512 * 1024; - compaction_config.sub_level_max_compaction_bytes = 256 * 1024; + let compaction_config = risingwave_common::config::CompactionConfig { + max_sub_compaction: 1, + level0_tier_compact_file_number: 1, + max_bytes_for_level_base: 512 * 1024, + sub_level_max_compaction_bytes: 256 * 1024, + ..Default::default() + }; compaction_test( - compaction_config.clone(), + compaction_config, config.clone(), "hummock+memory", 1000000,