From f071ecbe3c8460e4876a6855d2010ffd02a02a58 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Thu, 3 Aug 2023 10:04:53 +0800 Subject: [PATCH 01/17] compact half task Signed-off-by: Little-Wallace --- src/common/src/config.rs | 12 +++ .../src/hummock/compaction/level_selector.rs | 2 +- .../hummock/compaction/overlap_strategy.rs | 10 ++- .../picker/base_level_compaction_picker.rs | 64 ++++++++++++++- .../compaction/picker/include_sst_picker.rs | 80 +++++++++++++++++++ src/meta/src/hummock/compaction/picker/mod.rs | 1 + .../manager/compaction_group_manager.rs | 32 ++++---- src/meta/src/hummock/manager/mod.rs | 22 ++--- src/meta/src/hummock/test_utils.rs | 24 +++--- src/meta/src/lib.rs | 2 +- src/meta/src/manager/env.rs | 4 +- .../hummock_test/src/compactor_tests.rs | 13 +-- .../hummock_test/src/sync_point_tests.rs | 11 +-- .../src/delete_range_runner.rs | 22 ++--- 14 files changed, 231 insertions(+), 68 deletions(-) create mode 100644 src/meta/src/hummock/compaction/picker/include_sst_picker.rs diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 06c8794c462c3..956c922f534a5 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -1054,6 +1054,13 @@ pub mod default { pub fn level0_max_compact_file_number() -> u64 { DEFAULT_MAX_COMPACTION_FILE_COUNT } + + pub fn large_group_max_bytes_for_level_base() -> u64 { + DEFAULT_MAX_BYTES_FOR_LEVEL_BASE * 4 + } + pub fn large_group_sub_level_max_compaction_bytes() -> u64 { + DEFAULT_MIN_COMPACTION_BYTES * 3 + } } } @@ -1133,6 +1140,11 @@ pub struct CompactionConfig { pub max_space_reclaim_bytes: u64, #[serde(default = "default::compaction_config::level0_max_compact_file_number")] pub level0_max_compact_file_number: u64, + + #[serde(default = "default::compaction_config::large_group_max_bytes_for_level_base")] + pub large_group_max_bytes_for_level_base: u64, + #[serde(default = "default::compaction_config::large_group_sub_level_max_compaction_bytes")] + pub large_group_sub_level_max_compaction_bytes: u64, } #[cfg(test)] diff --git a/src/meta/src/hummock/compaction/level_selector.rs b/src/meta/src/hummock/compaction/level_selector.rs index 4f75f43fa6ad1..797291a3871a2 100644 --- a/src/meta/src/hummock/compaction/level_selector.rs +++ b/src/meta/src/hummock/compaction/level_selector.rs @@ -241,7 +241,7 @@ impl DynamicLevelSelectorCore { .count() as u64; let non_overlapping_level_score = non_overlapping_level_count * SCORE_BASE / std::cmp::max( - base_level_sst_count / 16, + base_level_sst_count / 32, self.config.level0_sub_level_compact_level_count as u64, ); diff --git a/src/meta/src/hummock/compaction/overlap_strategy.rs b/src/meta/src/hummock/compaction/overlap_strategy.rs index a94efd0ca9888..8ce1d2c0a0c4e 100644 --- a/src/meta/src/hummock/compaction/overlap_strategy.rs +++ b/src/meta/src/hummock/compaction/overlap_strategy.rs @@ -24,7 +24,11 @@ pub trait OverlapInfo { fn check_overlap(&self, a: &SstableInfo) -> bool; fn check_multiple_overlap(&self, others: &[SstableInfo]) -> Range; fn check_multiple_include(&self, others: &[SstableInfo]) -> Range; - fn update(&mut self, table: &SstableInfo); + fn update(&mut self, table: &SstableInfo) { + let other = table.key_range.as_ref().unwrap(); + self.update_key_range(other); + } + fn update_key_range(&mut self, table: &KeyRange); } pub trait OverlapStrategy: Send + Sync { @@ -45,6 +49,7 @@ pub trait OverlapStrategy: Send + Sync { others[range].to_vec() } } + fn check_overlap_with_tables( &self, tables: &[SstableInfo], @@ -136,8 +141,7 @@ impl OverlapInfo for RangeOverlapInfo { } } - fn update(&mut self, table: &SstableInfo) { - let other = table.key_range.as_ref().unwrap(); + fn update_key_range(&mut self, other: &KeyRange) { if let Some(range) = self.target_range.as_mut() { range.full_key_extend(other); return; diff --git a/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs index 35e0538cb76f7..471262cbcdbd2 100644 --- a/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs @@ -22,6 +22,7 @@ use risingwave_pb::hummock::{CompactionConfig, InputLevel, Level, LevelType, Ove use super::min_overlap_compaction_picker::NonOverlapSubLevelPicker; use super::{CompactionInput, CompactionPicker, LocalPickerStatistic}; use crate::hummock::compaction::create_overlap_strategy; +use crate::hummock::compaction::picker::include_sst_picker::L0IncludeSstPicker; use crate::hummock::compaction::picker::MinOverlappingPicker; use crate::hummock::level_handler::LevelHandler; @@ -176,6 +177,63 @@ impl LevelCompactionPicker { return None; } + let mut last_ssts = vec![]; + let mut total_file_size = 0; + let mut last_overlap_info = overlap_strategy.create_overlap_info(); + let mut exist_small_task = false; + let max_target_file_size = std::cmp::min( + self.config.max_compaction_bytes / 8, + self.config.sub_level_max_compaction_bytes, + ); + for (idx, sst) in target_level.table_infos.iter().enumerate() { + let pending_compact = + level_handlers[target_level.level_idx as usize].is_pending_compact(&sst.sst_id); + if !last_ssts.is_empty() + && (total_file_size + sst.file_size > max_target_file_size || pending_compact) + { + let picker = L0IncludeSstPicker::new( + last_overlap_info, + overlap_strategy.clone(), + self.config.max_compaction_bytes / 2, + ); + last_overlap_info = overlap_strategy.create_overlap_info(); + let input = picker.pick_tables(&l0.sub_levels, &level_handlers[0]); + if !input.sstable_infos.is_empty() && input.total_file_size > total_file_size { + min_write_amp_meet = true; + exist_small_task = true; + input_levels.push((input, total_file_size, std::mem::take(&mut last_ssts))); + } + last_ssts.clear(); + total_file_size = 0; + } + if pending_compact { + continue; + } + let key_range = sst.key_range.as_ref().unwrap(); + if idx > 0 && idx + 1 < target_level.table_infos.len() { + last_overlap_info.update_key_range(key_range); + } else if idx == 0 { + let mut key_range = key_range.clone(); + key_range.left.clear(); + last_overlap_info.update_key_range(&key_range); + } else { + let mut key_range = key_range.clone(); + key_range.right.clear(); + last_overlap_info.update_key_range(&key_range); + } + + total_file_size += sst.file_size; + last_ssts.push(sst.clone()); + } + input_levels.sort_by_key(|(input, _, _)| input.sstable_infos.len()); + input_levels.sort_by(|(a, _, _), (b, _, _)| { + b.sstable_infos + .len() + .cmp(&a.sstable_infos.len()) + .then_with(|| a.total_file_count.cmp(&b.total_file_count)) + .then_with(|| a.total_file_size.cmp(&b.total_file_size)) + }); + if !min_write_amp_meet && strict_check { // If the write-amplification of all candidate task are large, we may hope to wait base // level compact more data to lower level. But if we skip all task, I'm @@ -186,7 +244,11 @@ impl LevelCompactionPicker { } for (input, target_file_size, target_level_files) in input_levels { - if min_write_amp_meet && input.total_file_size < target_file_size { + if min_write_amp_meet && target_file_size > input.total_file_size { + continue; + } + + if exist_small_task && target_file_size > self.config.sub_level_max_compaction_bytes { continue; } diff --git a/src/meta/src/hummock/compaction/picker/include_sst_picker.rs b/src/meta/src/hummock/compaction/picker/include_sst_picker.rs new file mode 100644 index 0000000000000..132918a6a9e28 --- /dev/null +++ b/src/meta/src/hummock/compaction/picker/include_sst_picker.rs @@ -0,0 +1,80 @@ +use std::sync::Arc; + +use risingwave_pb::hummock::Level; + +use crate::hummock::compaction::overlap_strategy::{OverlapInfo, OverlapStrategy}; +use crate::hummock::compaction::picker::min_overlap_compaction_picker::SubLevelSstables; +use crate::hummock::level_handler::LevelHandler; +pub const MAX_LEVEL_COUNT: usize = 32; + +pub struct L0IncludeSstPicker { + overlap_info: Box, + overlap_strategy: Arc, + max_compact_size: u64, +} + +impl L0IncludeSstPicker { + pub fn new( + overlap_info: Box, + overlap_strategy: Arc, + max_compact_size: u64, + ) -> Self { + Self { + overlap_info, + overlap_strategy, + max_compact_size, + } + } + + pub fn pick_tables( + &self, + sub_levels: &[Level], + level_handler: &LevelHandler, + ) -> SubLevelSstables { + let mut overlaps: Vec> = vec![]; + let mut ret = SubLevelSstables::default(); + for level in sub_levels { + if ret.total_file_size > self.max_compact_size + || ret.sstable_infos.len() >= MAX_LEVEL_COUNT + { + break; + } + let mut range = self.overlap_info.check_multiple_include(&level.table_infos); + for overlap in &overlaps { + let old_range = overlap.check_multiple_include(&level.table_infos); + range.start = std::cmp::max(range.start, old_range.start); + range.end = std::cmp::min(range.end, old_range.end); + } + if range.start >= range.end { + break; + } + let pending_compact = range + .any(|index| level_handler.is_pending_compact(&level.table_infos[index].sst_id)); + if pending_compact { + break; + } + let mut overlap = self.overlap_strategy.create_overlap_info(); + ret.sstable_infos + .push(level.table_infos[range.clone()].to_vec()); + for index in range { + ret.total_file_count += 1; + ret.total_file_size += level.table_infos[index].file_size; + let key_range = level.table_infos[index].key_range.as_ref().unwrap(); + if index > 0 && index + 1 < level.table_infos.len() { + overlap.update_key_range(key_range); + continue; + } + let mut key_range = key_range.clone(); + if index == 0 { + key_range.left.clear(); + } + if index + 1 == level.table_infos.len() { + key_range.right.clear(); + } + overlap.update_key_range(&key_range); + } + overlaps.push(overlap); + } + ret + } +} diff --git a/src/meta/src/hummock/compaction/picker/mod.rs b/src/meta/src/hummock/compaction/picker/mod.rs index 27c879aba32ac..f282066dd8314 100644 --- a/src/meta/src/hummock/compaction/picker/mod.rs +++ b/src/meta/src/hummock/compaction/picker/mod.rs @@ -13,6 +13,7 @@ // limitations under the License. mod base_level_compaction_picker; +mod include_sst_picker; mod manual_compaction_picker; mod min_overlap_compaction_picker; mod space_reclaim_compaction_picker; diff --git a/src/meta/src/hummock/manager/compaction_group_manager.rs b/src/meta/src/hummock/manager/compaction_group_manager.rs index 6db71e93bcf64..ca5abbec9326b 100644 --- a/src/meta/src/hummock/manager/compaction_group_manager.rs +++ b/src/meta/src/hummock/manager/compaction_group_manager.rs @@ -54,16 +54,13 @@ impl HummockManager { pub(super) async fn build_compaction_group_manager( env: &MetaSrvEnv, ) -> Result> { - let default_config = match env.opts.compaction_config.as_ref() { - None => CompactionConfigBuilder::new().build(), - Some(opt) => CompactionConfigBuilder::with_opt(opt).build(), - }; - Self::build_compaction_group_manager_with_config(env, default_config).await + let opts = env.opts.compaction_config.clone(); + Self::build_compaction_group_manager_with_config(env, opts).await } pub(super) async fn build_compaction_group_manager_with_config( env: &MetaSrvEnv, - default_config: CompactionConfig, + default_config: risingwave_common::config::CompactionConfig, ) -> Result> { let compaction_group_manager = RwLock::new(CompactionGroupManager { compaction_groups: BTreeMap::new(), @@ -554,15 +551,15 @@ impl HummockManager { .generate::<{ IdCategory::CompactionGroup }>() .await?; // The new config will be persisted later. - let mut config = self - .compaction_group_manager - .read() - .await - .default_compaction_config(); + let manager = self.compaction_group_manager.read().await; + let opts = manager.default_compaction_config(); + let mut config = CompactionConfigBuilder::with_opt(&opts).build(); config.split_by_state_table = allow_split_by_table; if !allow_split_by_table { // TODO: remove it after we increase `max_bytes_for_level_base` for all group. - config.max_bytes_for_level_base *= 4; + config.max_bytes_for_level_base = opts.large_group_max_bytes_for_level_base; + config.sub_level_max_compaction_bytes = + opts.large_group_sub_level_max_compaction_bytes; config.split_weight_by_vnode = weight_split_by_vnode; } @@ -729,7 +726,7 @@ impl HummockManager { #[derive(Default)] pub(super) struct CompactionGroupManager { compaction_groups: BTreeMap, - default_config: CompactionConfig, + default_config: risingwave_common::config::CompactionConfig, } impl CompactionGroupManager { @@ -769,7 +766,10 @@ impl CompactionGroupManager { if compaction_groups.contains_key(id) { continue; } - let new_entry = CompactionGroup::new(*id, self.default_config.clone()); + let new_entry = CompactionGroup::new( + *id, + CompactionConfigBuilder::with_opt(&self.default_config).build(), + ); compaction_groups.insert(*id, new_entry); } let mut trx = Transaction::default(); @@ -791,8 +791,8 @@ impl CompactionGroupManager { self.compaction_groups.get(&compaction_group_id).cloned() } - pub(super) fn default_compaction_config(&self) -> CompactionConfig { - self.default_config.clone() + pub(super) fn default_compaction_config(&self) -> &risingwave_common::config::CompactionConfig { + &self.default_config } async fn update_compaction_config( diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index b00faaabe5a8f..6888df1776c24 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -287,17 +287,16 @@ where fragment_manager: FragmentManagerRef, metrics: Arc, compactor_manager: CompactorManagerRef, - config: CompactionConfig, + opts: risingwave_common::config::CompactionConfig, compactor_streams_change_tx: UnboundedSender<( u32, Streaming, )>, ) -> HummockManagerRef { use crate::manager::CatalogManager; - let compaction_group_manager = - Self::build_compaction_group_manager_with_config(&env, config) - .await - .unwrap(); + let compaction_group_manager = Self::build_compaction_group_manager_with_config(&env, opts) + .await + .unwrap(); let catalog_manager = Arc::new(CatalogManager::new(env.clone()).await.unwrap()); Self::new_impl( env, @@ -449,11 +448,12 @@ where checkpoint } else { // As no record found in stores, create a initial version. - let default_compaction_config = self - .compaction_group_manager - .read() - .await - .default_compaction_config(); + let default_compaction_config = { + let compaction_group_manager = self.compaction_group_manager.read().await; + let default_compaction = compaction_group_manager.default_compaction_config(); + CompactionConfigBuilder::with_opt(default_compaction).build() + }; + let checkpoint = create_init_version(default_compaction_config); tracing::info!("init hummock version checkpoint"); HummockVersionStats::default() @@ -2794,6 +2794,8 @@ fn init_selectors() -> HashMap> { type CompactionRequestChannelItem = (CompactionGroupId, compact_task::TaskType); use tokio::sync::mpsc::error::SendError; +use crate::hummock::compaction::compaction_config::CompactionConfigBuilder; + #[derive(Debug, Default)] pub struct CompactionState { scheduled: Mutex>, diff --git a/src/meta/src/hummock/test_utils.rs b/src/meta/src/hummock/test_utils.rs index 14e262d40c083..7fcb3fd05e548 100644 --- a/src/meta/src/hummock/test_utils.rs +++ b/src/meta/src/hummock/test_utils.rs @@ -24,12 +24,9 @@ use risingwave_hummock_sdk::{ use risingwave_pb::common::{HostAddress, WorkerNode, WorkerType}; #[cfg(test)] use risingwave_pb::hummock::compact_task::TaskStatus; -use risingwave_pb::hummock::{ - CompactionConfig, HummockSnapshot, HummockVersion, KeyRange, SstableInfo, -}; +use risingwave_pb::hummock::{HummockSnapshot, HummockVersion, KeyRange, SstableInfo}; use risingwave_pb::meta::add_worker_node_request::Property; -use crate::hummock::compaction::compaction_config::CompactionConfigBuilder; #[cfg(test)] use crate::hummock::compaction::default_level_selector; use crate::hummock::{CompactorManager, HummockManager, HummockManagerRef}; @@ -302,7 +299,7 @@ pub fn get_sorted_committed_object_ids( } pub async fn setup_compute_env_with_config( port: i32, - config: CompactionConfig, + opts: risingwave_common::config::CompactionConfig, ) -> ( MetaSrvEnv, HummockManagerRef, @@ -328,7 +325,7 @@ pub async fn setup_compute_env_with_config( fragment_manager, Arc::new(MetaMetrics::new()), compactor_manager, - config, + opts, compactor_streams_change_tx, ) .await; @@ -361,13 +358,14 @@ pub async fn setup_compute_env( ClusterManagerRef, WorkerNode, ) { - let config = CompactionConfigBuilder::new() - .level0_tier_compact_file_number(1) - .level0_max_compact_file_number(130) - .level0_sub_level_compact_level_count(1) - .level0_overlapping_sub_level_compact_level_count(1) - .build(); - setup_compute_env_with_config(port, config).await + let opts = risingwave_common::config::CompactionConfig { + level0_tier_compact_file_number: 1, + level0_max_compact_file_number: 130, + level0_sub_level_compact_level_count: 1, + level0_overlapping_sub_level_compact_level_count: 1, + ..Default::default() + }; + setup_compute_env_with_config(port, opts).await } pub async fn get_sst_ids( diff --git a/src/meta/src/lib.rs b/src/meta/src/lib.rs index 38782e2715c15..54f885c90754a 100644 --- a/src/meta/src/lib.rs +++ b/src/meta/src/lib.rs @@ -299,7 +299,7 @@ pub fn start(opts: MetaNodeOpts) -> Pin + Send>> { compaction_task_max_heartbeat_interval_secs: config .meta .compaction_task_max_heartbeat_interval_secs, - compaction_config: Some(config.meta.compaction_config), + compaction_config: config.meta.compaction_config, }, config.system.into_init_system_params(), ) diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index 5a6c63016974a..7ce4c81813f45 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -149,7 +149,7 @@ pub struct MetaOpts { pub min_table_split_write_throughput: u64, pub compaction_task_max_heartbeat_interval_secs: u64, - pub compaction_config: Option, + pub compaction_config: CompactionConfig, } impl MetaOpts { @@ -186,7 +186,7 @@ impl MetaOpts { do_not_config_object_storage_lifecycle: true, partition_vnode_count: 32, compaction_task_max_heartbeat_interval_secs: 0, - compaction_config: None, + compaction_config: CompactionConfig::default(), } } } diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index 40f023275c7a6..d5bbaf860e7d8 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -203,13 +203,14 @@ pub(crate) mod tests { #[tokio::test] async fn test_compaction_watermark() { - let config = CompactionConfigBuilder::new() - .level0_tier_compact_file_number(1) - .level0_max_compact_file_number(130) - .level0_overlapping_sub_level_compact_level_count(1) - .build(); + let opts = risingwave_common::config::CompactionConfig { + level0_tier_compact_file_number: 1, + level0_max_compact_file_number: 130, + level0_overlapping_sub_level_compact_level_count: 1, + ..Default::default() + }; let (env, hummock_manager_ref, _cluster_manager_ref, worker_node) = - setup_compute_env_with_config(8080, config).await; + setup_compute_env_with_config(8080, opts).await; let hummock_meta_client: Arc = Arc::new(MockHummockMetaClient::new( hummock_manager_ref.clone(), worker_node.id, diff --git a/src/storage/hummock_test/src/sync_point_tests.rs b/src/storage/hummock_test/src/sync_point_tests.rs index 72d354e3fd29b..4c5d14ef7f7cf 100644 --- a/src/storage/hummock_test/src/sync_point_tests.rs +++ b/src/storage/hummock_test/src/sync_point_tests.rs @@ -263,12 +263,13 @@ pub async fn compact_once( #[cfg(feature = "sync_point")] #[serial] async fn test_syncpoints_get_in_delete_range_boundary() { - let config = CompactionConfigBuilder::new() - .level0_tier_compact_file_number(1) - .max_bytes_for_level_base(4096) - .build(); + let opts = risingwave_common::config::CompactionConfig { + level0_tier_compact_file_number: 1, + max_bytes_for_level_base: 4096, + ..Default::default() + }; let (env, hummock_manager_ref, _cluster_manager_ref, worker_node) = - setup_compute_env_with_config(8080, config).await; + setup_compute_env_with_config(8080, opts).await; let hummock_meta_client: Arc = Arc::new(MockHummockMetaClient::new( hummock_manager_ref.clone(), worker_node.id, diff --git a/src/tests/compaction_test/src/delete_range_runner.rs b/src/tests/compaction_test/src/delete_range_runner.rs index 7b890d2135e22..0745ddee1f4f4 100644 --- a/src/tests/compaction_test/src/delete_range_runner.rs +++ b/src/tests/compaction_test/src/delete_range_runner.rs @@ -86,9 +86,8 @@ pub fn start_delete_range(opts: CompactionTestOpts) -> Pin anyhow::Result<()> { let config = load_config(&opts.config_path, NoOverride); - let compaction_config = CompactionConfigBuilder::new().build(); compaction_test( - compaction_config, + risingwave_common::config::CompactionConfig::default(), config, &opts.state_store, 1000000, @@ -99,15 +98,16 @@ pub async fn compaction_test_main(opts: CompactionTestOpts) -> anyhow::Result<() } async fn compaction_test( - compaction_config: CompactionConfig, + opts: risingwave_common::config::CompactionConfig, config: RwConfig, state_store_type: &str, test_range: u64, test_count: u64, test_delete_ratio: u32, ) -> anyhow::Result<()> { + let compaction_config = CompactionConfigBuilder::with_opt(&opts).build(); let (env, hummock_manager_ref, _cluster_manager_ref, worker_node) = - setup_compute_env_with_config(8080, compaction_config.clone()).await; + setup_compute_env_with_config(8080, opts).await; let meta_client = Arc::new(MockHummockMetaClient::new( hummock_manager_ref.clone(), worker_node.id, @@ -595,13 +595,15 @@ mod tests { #[tokio::test(flavor = "multi_thread", worker_threads = 3)] async fn test_small_data() { let config = RwConfig::default(); - let mut compaction_config = CompactionConfigBuilder::new().build(); - compaction_config.max_sub_compaction = 1; - compaction_config.level0_tier_compact_file_number = 2; - compaction_config.max_bytes_for_level_base = 512 * 1024; - compaction_config.sub_level_max_compaction_bytes = 256 * 1024; + let compaction_config = risingwave_common::config::CompactionConfig { + max_sub_compaction: 1, + level0_tier_compact_file_number: 1, + max_bytes_for_level_base: 512 * 1024, + sub_level_max_compaction_bytes: 256 * 1024, + ..Default::default() + }; compaction_test( - compaction_config.clone(), + compaction_config, config.clone(), "hummock+memory", 1000000, From ea6cf138964a2a4032fac70086aef7c12488e1e3 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Mon, 7 Aug 2023 14:59:02 +0800 Subject: [PATCH 02/17] switch builder without vnode change Signed-off-by: Little-Wallace --- .../picker/base_level_compaction_picker.rs | 17 +++++++++++++++-- .../src/hummock/sstable/multi_builder.rs | 14 ++++---------- .../compaction_test/src/delete_range_runner.rs | 2 +- 3 files changed, 20 insertions(+), 13 deletions(-) diff --git a/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs index 471262cbcdbd2..2ae10762ba432 100644 --- a/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs @@ -181,7 +181,7 @@ impl LevelCompactionPicker { let mut total_file_size = 0; let mut last_overlap_info = overlap_strategy.create_overlap_info(); let mut exist_small_task = false; - let max_target_file_size = std::cmp::min( + let max_target_level_size = std::cmp::min( self.config.max_compaction_bytes / 8, self.config.sub_level_max_compaction_bytes, ); @@ -189,7 +189,7 @@ impl LevelCompactionPicker { let pending_compact = level_handlers[target_level.level_idx as usize].is_pending_compact(&sst.sst_id); if !last_ssts.is_empty() - && (total_file_size + sst.file_size > max_target_file_size || pending_compact) + && (total_file_size + sst.file_size > max_target_level_size || pending_compact) { let picker = L0IncludeSstPicker::new( last_overlap_info, @@ -225,6 +225,19 @@ impl LevelCompactionPicker { total_file_size += sst.file_size; last_ssts.push(sst.clone()); } + if !last_ssts.is_empty() { + let picker = L0IncludeSstPicker::new( + last_overlap_info, + overlap_strategy.clone(), + self.config.max_compaction_bytes / 2, + ); + let input = picker.pick_tables(&l0.sub_levels, &level_handlers[0]); + if !input.sstable_infos.is_empty() && input.total_file_size > total_file_size { + min_write_amp_meet = true; + exist_small_task = true; + input_levels.push((input, total_file_size, std::mem::take(&mut last_ssts))); + } + } input_levels.sort_by_key(|(input, _, _)| input.sstable_infos.len()); input_levels.sort_by(|(a, _, _), (b, _, _)| { b.sstable_infos diff --git a/src/storage/src/hummock/sstable/multi_builder.rs b/src/storage/src/hummock/sstable/multi_builder.rs index d241aa84abe99..3ddb328e368ab 100644 --- a/src/storage/src/hummock/sstable/multi_builder.rs +++ b/src/storage/src/hummock/sstable/multi_builder.rs @@ -69,7 +69,6 @@ where task_progress: Option>, last_table_id: u32, - is_target_level_l0_or_lbase: bool, split_by_table: bool, split_weight_by_vnode: u32, /// When vnode of the coming key is greater than `largest_vnode_in_current_partition`, we will @@ -107,7 +106,6 @@ where compactor_metrics, task_progress, last_table_id: 0, - is_target_level_l0_or_lbase, split_by_table, split_weight_by_vnode, largest_vnode_in_current_partition: VirtualNode::MAX.to_index(), @@ -123,7 +121,6 @@ where compactor_metrics: Arc::new(CompactorMetrics::unused()), task_progress: None, last_table_id: 0, - is_target_level_l0_or_lbase: false, split_by_table: false, split_weight_by_vnode: 0, largest_vnode_in_current_partition: VirtualNode::MAX.to_index(), @@ -163,7 +160,8 @@ where value: HummockValue<&[u8]>, is_new_user_key: bool, ) -> HummockResult<()> { - let (switch_builder, vnode_changed) = self.check_table_and_vnode_change(&full_key.user_key); + let (switch_builder, _vnode_changed) = + self.check_table_and_vnode_change(&full_key.user_key); // We use this `need_seal_current` flag to store whether we need to call `seal_current` and // then call `seal_current` later outside the `if let` instead of calling @@ -177,12 +175,8 @@ where let mut last_range_tombstone_epoch = HummockEpoch::MAX; if let Some(builder) = self.current_builder.as_mut() { if is_new_user_key { - if switch_builder { - need_seal_current = true; - } else if builder.reach_capacity() || builder.reach_key_count() { - need_seal_current = self.split_weight_by_vnode == 0 - || (self.is_target_level_l0_or_lbase && vnode_changed); - } + need_seal_current = + switch_builder || builder.reach_capacity() || builder.reach_key_count(); } if need_seal_current && let Some(event) = builder.last_range_tombstone() && event.new_epoch != HummockEpoch::MAX { last_range_tombstone_epoch = event.new_epoch; diff --git a/src/tests/compaction_test/src/delete_range_runner.rs b/src/tests/compaction_test/src/delete_range_runner.rs index 25bdbbe795e59..2656670b83880 100644 --- a/src/tests/compaction_test/src/delete_range_runner.rs +++ b/src/tests/compaction_test/src/delete_range_runner.rs @@ -36,7 +36,7 @@ use risingwave_meta::hummock::MockHummockMetaClient; use risingwave_object_store::object::object_metrics::ObjectStoreMetrics; use risingwave_object_store::object::parse_remote_object_store; use risingwave_pb::catalog::PbTable; -use risingwave_pb::hummock::{CompactionConfig, CompactionGroupInfo}; +use risingwave_pb::hummock::CompactionGroupInfo; use risingwave_pb::meta::SystemParams; use risingwave_rpc_client::HummockMetaClient; use risingwave_storage::filter_key_extractor::{ From c8c05f2f8734d1fb6ac3320d3bb67b5eb25813c8 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Mon, 7 Aug 2023 20:01:57 +0800 Subject: [PATCH 03/17] fix test Signed-off-by: Little-Wallace --- src/config/example.toml | 2 ++ src/meta/src/hummock/compaction/mod.rs | 8 +------ .../hummock/compaction/overlap_strategy.rs | 22 ++++++++++++------- .../manager/compaction_group_manager.rs | 2 +- .../hummock_test/src/compactor_tests.rs | 1 - .../src/delete_range_runner.rs | 1 - 6 files changed, 18 insertions(+), 18 deletions(-) diff --git a/src/config/example.toml b/src/config/example.toml index ce597f5317be7..e3d3193d29d79 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -52,6 +52,8 @@ level0_sub_level_compact_level_count = 3 level0_overlapping_sub_level_compact_level_count = 6 max_space_reclaim_bytes = 536870912 level0_max_compact_file_number = 96 +large_group_max_bytes_for_level_base = 2147483648 +large_group_sub_level_max_compaction_bytes = 402653184 [batch] enable_barrier_read = true diff --git a/src/meta/src/hummock/compaction/mod.rs b/src/meta/src/hummock/compaction/mod.rs index 1e0b42e4c70d8..8d14cc0c33b0d 100644 --- a/src/meta/src/hummock/compaction/mod.rs +++ b/src/meta/src/hummock/compaction/mod.rs @@ -281,13 +281,7 @@ pub fn create_compaction_task( ) -> CompactionTask { let target_file_size = if input.target_level == 0 { compaction_config.target_file_size_base - } else if input.target_level == base_level { - // This is just a temporary optimization measure. We hope to reduce the size of SST as much - // as possible to reduce the amount of data blocked by a single task during compaction, - // but too many files will increase computing overhead. - // TODO: remove it after can reduce configuration `target_file_size_base`. - compaction_config.target_file_size_base / 4 - } else { + } else { assert!(input.target_level >= base_level); let step = (input.target_level - base_level) / 2; compaction_config.target_file_size_base << step diff --git a/src/meta/src/hummock/compaction/overlap_strategy.rs b/src/meta/src/hummock/compaction/overlap_strategy.rs index 8ce1d2c0a0c4e..6c5e31a03947e 100644 --- a/src/meta/src/hummock/compaction/overlap_strategy.rs +++ b/src/meta/src/hummock/compaction/overlap_strategy.rs @@ -117,19 +117,25 @@ impl OverlapInfo for RangeOverlapInfo { fn check_multiple_include(&self, others: &[SstableInfo]) -> Range { match self.target_range.as_ref() { Some(key_range) => { - let overlap_begin = others.partition_point(|table_status| { - KeyComparator::compare_encoded_full_key( - &table_status.key_range.as_ref().unwrap().left, - &key_range.left, - ) == cmp::Ordering::Less + let overlap_begin = others.partition_point(|sst| { + let ord = if key_range.left.is_empty() { + cmp::Ordering::Greater + } else { + KeyComparator::compare_encoded_full_key( + &sst.key_range.as_ref().unwrap().left, + &key_range.left, + ) + }; + ord == cmp::Ordering::Less }); if overlap_begin >= others.len() { return overlap_begin..overlap_begin; } let mut overlap_end = overlap_begin; - for table in &others[overlap_begin..] { - if key_range.compare_right_with(&table.key_range.as_ref().unwrap().right) - == cmp::Ordering::Less + for sst in &others[overlap_begin..] { + if !key_range.right.is_empty() + && key_range.compare_right_with(&sst.key_range.as_ref().unwrap().right) + == cmp::Ordering::Less { break; } diff --git a/src/meta/src/hummock/manager/compaction_group_manager.rs b/src/meta/src/hummock/manager/compaction_group_manager.rs index ca5abbec9326b..1315eea003a90 100644 --- a/src/meta/src/hummock/manager/compaction_group_manager.rs +++ b/src/meta/src/hummock/manager/compaction_group_manager.rs @@ -553,7 +553,7 @@ impl HummockManager { // The new config will be persisted later. let manager = self.compaction_group_manager.read().await; let opts = manager.default_compaction_config(); - let mut config = CompactionConfigBuilder::with_opt(&opts).build(); + let mut config = CompactionConfigBuilder::with_opt(opts).build(); config.split_by_state_table = allow_split_by_table; if !allow_split_by_table { // TODO: remove it after we increase `max_bytes_for_level_base` for all group. diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index 472928ab28e59..ec58bbe7983b8 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -32,7 +32,6 @@ pub(crate) mod tests { use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::key::{next_key, TABLE_PREFIX_LEN}; use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map; - use risingwave_meta::hummock::compaction::compaction_config::CompactionConfigBuilder; use risingwave_meta::hummock::compaction::{default_level_selector, ManualCompactionOption}; use risingwave_meta::hummock::test_utils::{ register_table_ids_to_compaction_group, setup_compute_env, setup_compute_env_with_config, diff --git a/src/tests/compaction_test/src/delete_range_runner.rs b/src/tests/compaction_test/src/delete_range_runner.rs index 2656670b83880..b16e2d9921fe5 100644 --- a/src/tests/compaction_test/src/delete_range_runner.rs +++ b/src/tests/compaction_test/src/delete_range_runner.rs @@ -588,7 +588,6 @@ fn run_compactor_thread( mod tests { use risingwave_common::config::RwConfig; - use risingwave_meta::hummock::compaction::compaction_config::CompactionConfigBuilder; use super::compaction_test; From cb176bd35a374c3fd840cf72d20bcfc5ce209048 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Tue, 8 Aug 2023 11:33:29 +0800 Subject: [PATCH 04/17] fix license Signed-off-by: Little-Wallace --- src/meta/src/hummock/compaction/mod.rs | 2 +- .../compaction/picker/include_sst_picker.rs | 14 ++++++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/src/meta/src/hummock/compaction/mod.rs b/src/meta/src/hummock/compaction/mod.rs index 8d14cc0c33b0d..b84fb975bb6aa 100644 --- a/src/meta/src/hummock/compaction/mod.rs +++ b/src/meta/src/hummock/compaction/mod.rs @@ -281,7 +281,7 @@ pub fn create_compaction_task( ) -> CompactionTask { let target_file_size = if input.target_level == 0 { compaction_config.target_file_size_base - } else { + } else { assert!(input.target_level >= base_level); let step = (input.target_level - base_level) / 2; compaction_config.target_file_size_base << step diff --git a/src/meta/src/hummock/compaction/picker/include_sst_picker.rs b/src/meta/src/hummock/compaction/picker/include_sst_picker.rs index 132918a6a9e28..8004a72924a68 100644 --- a/src/meta/src/hummock/compaction/picker/include_sst_picker.rs +++ b/src/meta/src/hummock/compaction/picker/include_sst_picker.rs @@ -1,3 +1,17 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + use std::sync::Arc; use risingwave_pb::hummock::Level; From cd4b250c048275bdc5bce5f544a4183f48cf3075 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Tue, 8 Aug 2023 13:03:55 +0800 Subject: [PATCH 05/17] fix include Signed-off-by: Little-Wallace --- .../src/hummock/compaction/level_selector.rs | 2 +- .../picker/base_level_compaction_picker.rs | 125 ++++++++++-------- .../compaction/picker/include_sst_picker.rs | 11 +- .../src/hummock/sstable/multi_builder.rs | 2 +- 4 files changed, 75 insertions(+), 65 deletions(-) diff --git a/src/meta/src/hummock/compaction/level_selector.rs b/src/meta/src/hummock/compaction/level_selector.rs index 797291a3871a2..096d197cd816b 100644 --- a/src/meta/src/hummock/compaction/level_selector.rs +++ b/src/meta/src/hummock/compaction/level_selector.rs @@ -241,7 +241,7 @@ impl DynamicLevelSelectorCore { .count() as u64; let non_overlapping_level_score = non_overlapping_level_count * SCORE_BASE / std::cmp::max( - base_level_sst_count / 32, + base_level_sst_count / 8, self.config.level0_sub_level_compact_level_count as u64, ); diff --git a/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs index 2ae10762ba432..a311f8ef087c2 100644 --- a/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs @@ -49,12 +49,10 @@ impl CompactionPicker for LevelCompactionPicker { return None; } - let is_l0_pending_compact = - level_handlers[0].is_level_all_pending_compact(&l0.sub_levels[0]); - - if is_l0_pending_compact { - stats.skip_by_pending_files += 1; - return None; + if let Some(ret) = + self.pick_l0_trivial_move_file(l0, levels.get_level(self.target_level), level_handlers) + { + return Some(ret); } debug_assert!(self.target_level == levels.get_level(self.target_level).level_idx as usize); @@ -67,11 +65,7 @@ impl CompactionPicker for LevelCompactionPicker { return Some(ret); } - if let Some(ret) = self.pick_l0_intra(l0, &level_handlers[0], stats) { - return Some(ret); - } - - self.pick_l0_trivial_move_file(l0, level_handlers) + self.pick_l0_intra(l0, &level_handlers[0], stats) } } @@ -178,64 +172,48 @@ impl LevelCompactionPicker { } let mut last_ssts = vec![]; - let mut total_file_size = 0; - let mut last_overlap_info = overlap_strategy.create_overlap_info(); let mut exist_small_task = false; let max_target_level_size = std::cmp::min( self.config.max_compaction_bytes / 8, self.config.sub_level_max_compaction_bytes, ); for (idx, sst) in target_level.table_infos.iter().enumerate() { - let pending_compact = - level_handlers[target_level.level_idx as usize].is_pending_compact(&sst.sst_id); - if !last_ssts.is_empty() - && (total_file_size + sst.file_size > max_target_level_size || pending_compact) - { + let mut last_overlap_info = overlap_strategy.create_overlap_info(); + let mut total_file_size = 0; + for end in idx..target_level.table_infos.len() { + if end > idx && total_file_size + sst.file_size > max_target_level_size { + break; + } + if level_handlers[self.target_level] + .is_pending_compact(&target_level.table_infos[end].sst_id) + { + break; + } + total_file_size += sst.file_size; + let key_range = target_level.table_infos[end].key_range.as_ref().unwrap(); + if end > 0 && end + 1 < target_level.table_infos.len() { + last_overlap_info.update_key_range(key_range); + } else if end == 0 { + let mut key_range = key_range.clone(); + key_range.left.clear(); + last_overlap_info.update_key_range(&key_range); + } else { + let mut key_range = key_range.clone(); + key_range.right.clear(); + last_overlap_info.update_key_range(&key_range); + } + let picker = L0IncludeSstPicker::new( - last_overlap_info, overlap_strategy.clone(), self.config.max_compaction_bytes / 2, ); - last_overlap_info = overlap_strategy.create_overlap_info(); - let input = picker.pick_tables(&l0.sub_levels, &level_handlers[0]); - if !input.sstable_infos.is_empty() && input.total_file_size > total_file_size { + let input = + picker.pick_tables(&last_overlap_info, &l0.sub_levels, &level_handlers[0]); + if input.total_file_size > total_file_size && input.sstable_infos.len() > 1 { min_write_amp_meet = true; exist_small_task = true; input_levels.push((input, total_file_size, std::mem::take(&mut last_ssts))); } - last_ssts.clear(); - total_file_size = 0; - } - if pending_compact { - continue; - } - let key_range = sst.key_range.as_ref().unwrap(); - if idx > 0 && idx + 1 < target_level.table_infos.len() { - last_overlap_info.update_key_range(key_range); - } else if idx == 0 { - let mut key_range = key_range.clone(); - key_range.left.clear(); - last_overlap_info.update_key_range(&key_range); - } else { - let mut key_range = key_range.clone(); - key_range.right.clear(); - last_overlap_info.update_key_range(&key_range); - } - - total_file_size += sst.file_size; - last_ssts.push(sst.clone()); - } - if !last_ssts.is_empty() { - let picker = L0IncludeSstPicker::new( - last_overlap_info, - overlap_strategy.clone(), - self.config.max_compaction_bytes / 2, - ); - let input = picker.pick_tables(&l0.sub_levels, &level_handlers[0]); - if !input.sstable_infos.is_empty() && input.total_file_size > total_file_size { - min_write_amp_meet = true; - exist_small_task = true; - input_levels.push((input, total_file_size, std::mem::take(&mut last_ssts))); } } input_levels.sort_by_key(|(input, _, _)| input.sstable_infos.len()); @@ -399,10 +377,47 @@ impl LevelCompactionPicker { fn pick_l0_trivial_move_file( &self, l0: &OverlappingLevel, + target_level: &Level, level_handlers: &[LevelHandler], ) -> Option { + if l0.sub_levels.is_empty() { + return None; + } let overlap_strategy = create_overlap_strategy(self.config.compaction_mode()); + let min_overlap_picker = MinOverlappingPicker::new( + 0, + self.target_level, + self.config.sub_level_max_compaction_bytes, + false, + overlap_strategy.clone(), + ); + let (select_tables, target_tables) = min_overlap_picker.pick_tables( + &l0.sub_levels[0].table_infos, + &target_level.table_infos, + level_handlers, + ); + // only pick tables for trivial move + if !select_tables.is_empty() && target_tables.is_empty() { + let input_levels = vec![]; + return Some(CompactionInput { + input_levels: vec![ + InputLevel { + level_idx: 0, + level_type: LevelType::Nonoverlapping as i32, + table_infos: select_tables, + }, + InputLevel { + level_idx: self.target_level as u32, + level_type: LevelType::Nonoverlapping as i32, + table_infos: vec![], + }, + ], + target_level: self.target_level, + target_sub_level_id: 0, + }); + } + for (idx, level) in l0.sub_levels.iter().enumerate() { if level.level_type == LevelType::Overlapping as i32 || idx + 1 >= l0.sub_levels.len() { continue; diff --git a/src/meta/src/hummock/compaction/picker/include_sst_picker.rs b/src/meta/src/hummock/compaction/picker/include_sst_picker.rs index 8004a72924a68..8712f6037b1d0 100644 --- a/src/meta/src/hummock/compaction/picker/include_sst_picker.rs +++ b/src/meta/src/hummock/compaction/picker/include_sst_picker.rs @@ -22,19 +22,13 @@ use crate::hummock::level_handler::LevelHandler; pub const MAX_LEVEL_COUNT: usize = 32; pub struct L0IncludeSstPicker { - overlap_info: Box, overlap_strategy: Arc, max_compact_size: u64, } impl L0IncludeSstPicker { - pub fn new( - overlap_info: Box, - overlap_strategy: Arc, - max_compact_size: u64, - ) -> Self { + pub fn new(overlap_strategy: Arc, max_compact_size: u64) -> Self { Self { - overlap_info, overlap_strategy, max_compact_size, } @@ -42,6 +36,7 @@ impl L0IncludeSstPicker { pub fn pick_tables( &self, + overlap_info: &Box, sub_levels: &[Level], level_handler: &LevelHandler, ) -> SubLevelSstables { @@ -53,7 +48,7 @@ impl L0IncludeSstPicker { { break; } - let mut range = self.overlap_info.check_multiple_include(&level.table_infos); + let mut range = overlap_info.check_multiple_include(&level.table_infos); for overlap in &overlaps { let old_range = overlap.check_multiple_include(&level.table_infos); range.start = std::cmp::max(range.start, old_range.start); diff --git a/src/storage/src/hummock/sstable/multi_builder.rs b/src/storage/src/hummock/sstable/multi_builder.rs index 406352ef0bd6f..d4405807eaf9f 100644 --- a/src/storage/src/hummock/sstable/multi_builder.rs +++ b/src/storage/src/hummock/sstable/multi_builder.rs @@ -176,7 +176,7 @@ where if let Some(builder) = self.current_builder.as_mut() { if is_new_user_key { need_seal_current = - switch_builder || builder.reach_capacity() || builder.reach_key_count(); + switch_builder || builder.reach_capacity() || builder.reach_max_key_count(); } if need_seal_current && let Some(event) = builder.last_range_tombstone() && event.new_epoch != HummockEpoch::MAX { last_range_tombstone_epoch = event.new_epoch; From c542d1a152a9cff1f5692c84ed397daa6fb18834 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Tue, 8 Aug 2023 13:03:55 +0800 Subject: [PATCH 06/17] fix include Signed-off-by: Little-Wallace --- .../picker/base_level_compaction_picker.rs | 106 ++++++++---------- .../compaction/picker/include_sst_picker.rs | 11 +- 2 files changed, 58 insertions(+), 59 deletions(-) diff --git a/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs index a311f8ef087c2..c5128e16f2b9b 100644 --- a/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs @@ -49,23 +49,50 @@ impl CompactionPicker for LevelCompactionPicker { return None; } - if let Some(ret) = - self.pick_l0_trivial_move_file(l0, levels.get_level(self.target_level), level_handlers) - { - return Some(ret); - } + let overlap_strategy = create_overlap_strategy(self.config.compaction_mode()); + + let target_level = levels.get_level(self.target_level); - debug_assert!(self.target_level == levels.get_level(self.target_level).level_idx as usize); - if let Some(ret) = self.pick_multi_level_to_base( - l0, - levels.get_level(self.target_level), + let min_overlap_picker = MinOverlappingPicker::new( + 0, + self.target_level, + self.config.sub_level_max_compaction_bytes, + false, + overlap_strategy.clone(), + ); + let (select_tables, target_tables) = min_overlap_picker.pick_tables( + &l0.sub_levels[0].table_infos, + &target_level.table_infos, level_handlers, - stats, - ) { + ); + // only pick tables for trivial move + if !select_tables.is_empty() && target_tables.is_empty() { + return Some(CompactionInput { + input_levels: vec![ + InputLevel { + level_idx: 0, + level_type: LevelType::Nonoverlapping as i32, + table_infos: select_tables, + }, + InputLevel { + level_idx: self.target_level as u32, + level_type: LevelType::Nonoverlapping as i32, + table_infos: vec![], + }, + ], + target_level: self.target_level, + target_sub_level_id: 0, + }); + } + debug_assert!(self.target_level == target_level.level_idx as usize); + if let Some(ret) = self.pick_multi_level_to_base(l0, target_level, level_handlers, stats) { return Some(ret); } - self.pick_l0_intra(l0, &level_handlers[0], stats) + if let Some(ret) = self.pick_l0_intra(l0, &level_handlers[0], stats) { + return Some(ret); + } + self.pick_l0_trivial_move_file(l0, level_handlers) } } @@ -206,9 +233,13 @@ impl LevelCompactionPicker { let picker = L0IncludeSstPicker::new( overlap_strategy.clone(), self.config.max_compaction_bytes / 2, + self.config.level0_max_compact_file_number / 2, + ); + let input = picker.pick_tables( + last_overlap_info.as_ref(), + &l0.sub_levels, + &level_handlers[0], ); - let input = - picker.pick_tables(&last_overlap_info, &l0.sub_levels, &level_handlers[0]); if input.total_file_size > total_file_size && input.sstable_infos.len() > 1 { min_write_amp_meet = true; exist_small_task = true; @@ -377,47 +408,13 @@ impl LevelCompactionPicker { fn pick_l0_trivial_move_file( &self, l0: &OverlappingLevel, - target_level: &Level, level_handlers: &[LevelHandler], ) -> Option { if l0.sub_levels.is_empty() { return None; } let overlap_strategy = create_overlap_strategy(self.config.compaction_mode()); - - let min_overlap_picker = MinOverlappingPicker::new( - 0, - self.target_level, - self.config.sub_level_max_compaction_bytes, - false, - overlap_strategy.clone(), - ); - let (select_tables, target_tables) = min_overlap_picker.pick_tables( - &l0.sub_levels[0].table_infos, - &target_level.table_infos, - level_handlers, - ); // only pick tables for trivial move - if !select_tables.is_empty() && target_tables.is_empty() { - let input_levels = vec![]; - return Some(CompactionInput { - input_levels: vec![ - InputLevel { - level_idx: 0, - level_type: LevelType::Nonoverlapping as i32, - table_infos: select_tables, - }, - InputLevel { - level_idx: self.target_level as u32, - level_type: LevelType::Nonoverlapping as i32, - table_infos: vec![], - }, - ], - target_level: self.target_level, - target_sub_level_id: 0, - }); - } - for (idx, level) in l0.sub_levels.iter().enumerate() { if level.level_type == LevelType::Overlapping as i32 || idx + 1 >= l0.sub_levels.len() { continue; @@ -508,7 +505,7 @@ pub mod tests { let config = Arc::new( CompactionConfigBuilder::new() .level0_tier_compact_file_number(2) - .level0_sub_level_compact_level_count(1) + .level0_sub_level_compact_level_count(2) .build(), ); LevelCompactionPicker::new(1, config) @@ -584,11 +581,10 @@ pub mod tests { ret.add_pending_task(1, &mut levels_handler); let mut local_stats = LocalPickerStatistic::default(); - // Cannot pick because no idle table in sub-level[0]. (And sub-level[0] is pending - // actually). push_table_level0_overlapping(&mut levels, generate_table(8, 1, 199, 233, 3)); - let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats); - assert!(ret.is_none()); + let _ = picker + .pick_compaction(&levels, &levels_handler, &mut local_stats) + .unwrap(); // Don't pick overlapping sub-level 8 levels_handler[0].remove_task(1); @@ -709,10 +705,6 @@ pub mod tests { .pick_compaction(&levels, &levels_handler, &mut local_stats) .unwrap(); ret.add_pending_task(0, &mut levels_handler); - let ret = picker - .pick_compaction(&levels, &levels_handler, &mut local_stats) - .unwrap(); - ret.add_pending_task(1, &mut levels_handler); push_tables_level0_nonoverlapping(&mut levels, vec![generate_table(3, 1, 250, 300, 3)]); let config: CompactionConfig = CompactionConfigBuilder::new() diff --git a/src/meta/src/hummock/compaction/picker/include_sst_picker.rs b/src/meta/src/hummock/compaction/picker/include_sst_picker.rs index 8712f6037b1d0..969d50109e175 100644 --- a/src/meta/src/hummock/compaction/picker/include_sst_picker.rs +++ b/src/meta/src/hummock/compaction/picker/include_sst_picker.rs @@ -24,19 +24,25 @@ pub const MAX_LEVEL_COUNT: usize = 32; pub struct L0IncludeSstPicker { overlap_strategy: Arc, max_compact_size: u64, + max_file_count: u64, } impl L0IncludeSstPicker { - pub fn new(overlap_strategy: Arc, max_compact_size: u64) -> Self { + pub fn new( + overlap_strategy: Arc, + max_compact_size: u64, + max_file_count: u64, + ) -> Self { Self { overlap_strategy, max_compact_size, + max_file_count, } } pub fn pick_tables( &self, - overlap_info: &Box, + overlap_info: &dyn OverlapInfo, sub_levels: &[Level], level_handler: &LevelHandler, ) -> SubLevelSstables { @@ -45,6 +51,7 @@ impl L0IncludeSstPicker { for level in sub_levels { if ret.total_file_size > self.max_compact_size || ret.sstable_infos.len() >= MAX_LEVEL_COUNT + || ret.total_file_count as u64 > self.max_file_count { break; } From 32ead0b8eac9726da85bec2150878c216949c1c6 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Wed, 9 Aug 2023 13:00:26 +0800 Subject: [PATCH 07/17] fix test Signed-off-by: Little-Wallace --- .../picker/base_level_compaction_picker.rs | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs index c5128e16f2b9b..e8acaf81596c5 100644 --- a/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs @@ -726,15 +726,16 @@ pub mod tests { // When picking L0->L0, L0's selecting_key_range should not be overlapped with L0's // compacting_key_range. let mut picker = create_compaction_picker_for_test(); + let sst = generate_table(3, 1, 200, 300, 2); let mut levels = Levels { levels: vec![Level { level_idx: 1, level_type: LevelType::Nonoverlapping as i32, - table_infos: vec![generate_table(3, 1, 200, 300, 2)], - total_file_size: 0, + total_file_size: sst.file_size, + uncompressed_file_size: sst.uncompressed_file_size, + table_infos: vec![sst], sub_level_id: 0, - uncompressed_file_size: 0, }], l0: Some(generate_l0_nonoverlapping_sublevels(vec![ generate_table(1, 1, 100, 210, 2), @@ -998,14 +999,11 @@ pub mod tests { #[test] fn test_l0_to_base_when_all_base_pending() { let l0 = generate_l0_nonoverlapping_multi_sublevels(vec![ - vec![ - generate_table(4, 1, 10, 90, 1), - generate_table(5, 1, 1000, 2000, 1), - ], + vec![generate_table(4, 1, 10, 90, 1)], vec![generate_table(6, 1, 10, 90, 1)], ]); - let levels = Levels { + let mut levels = Levels { l0: Some(l0), levels: vec![generate_level(1, vec![generate_table(3, 1, 10, 90, 1)])], member_table_ids: vec![1], @@ -1031,6 +1029,11 @@ pub mod tests { // trivial move do not be limited by level0_sub_level_compact_level_count ret.add_pending_task(0, &mut levels_handler); + let sst = generate_table(5, 1, 1000, 2000, 1); + levels.l0.as_mut().unwrap().sub_levels[0].total_file_size += sst.file_size; + levels.l0.as_mut().unwrap().sub_levels[0] + .table_infos + .push(sst); let ret = picker .pick_compaction(&levels, &levels_handler, &mut local_stats) .unwrap(); From 92495113a5007695be944ede61aa17226be6ec6a Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Wed, 9 Aug 2023 16:00:02 +0800 Subject: [PATCH 08/17] fix check Signed-off-by: Little-Wallace --- src/storage/hummock_test/src/compactor_tests.rs | 5 +++++ src/storage/hummock_test/src/sync_point_tests.rs | 1 - 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index ec58bbe7983b8..a4b7422dfb619 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -295,6 +295,11 @@ pub(crate) mod tests { table_infos: level.table_infos.clone(), }) .collect(); + compact_task.input_ssts.push(InputLevel { + level_idx: group.levels.last().unwrap().level_idx, + table_infos: group.levels.last().unwrap().table_infos.clone(), + level_type: group.levels.last().unwrap().level_type, + }); // assert compact_task assert_eq!(compact_task.input_ssts.len(), SST_COUNT as usize); diff --git a/src/storage/hummock_test/src/sync_point_tests.rs b/src/storage/hummock_test/src/sync_point_tests.rs index 4c5d14ef7f7cf..2ce20e3814d72 100644 --- a/src/storage/hummock_test/src/sync_point_tests.rs +++ b/src/storage/hummock_test/src/sync_point_tests.rs @@ -26,7 +26,6 @@ use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::key::{next_key, user_key}; use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map; use risingwave_hummock_sdk::HummockVersionId; -use risingwave_meta::hummock::compaction::compaction_config::CompactionConfigBuilder; use risingwave_meta::hummock::compaction::{default_level_selector, ManualCompactionOption}; use risingwave_meta::hummock::test_utils::{ add_ssts, register_table_ids_to_compaction_group, setup_compute_env, From 52e4b62fda97578d06a911db4281db03e0bb8813 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Wed, 9 Aug 2023 19:55:42 +0800 Subject: [PATCH 09/17] fix test Signed-off-by: Little-Wallace --- .../picker/base_level_compaction_picker.rs | 47 +++++++++++++++---- .../compaction/picker/include_sst_picker.rs | 8 ++-- 2 files changed, 42 insertions(+), 13 deletions(-) diff --git a/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs index e8acaf81596c5..3de3c269fa1b1 100644 --- a/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs @@ -198,26 +198,26 @@ impl LevelCompactionPicker { return None; } - let mut last_ssts = vec![]; let mut exist_small_task = false; let max_target_level_size = std::cmp::min( self.config.max_compaction_bytes / 8, self.config.sub_level_max_compaction_bytes, ); - for (idx, sst) in target_level.table_infos.iter().enumerate() { + for idx in 0..target_level.table_infos.len() { let mut last_overlap_info = overlap_strategy.create_overlap_info(); let mut total_file_size = 0; + let mut target_level_files = vec![]; for end in idx..target_level.table_infos.len() { - if end > idx && total_file_size + sst.file_size > max_target_level_size { + let next_sst = &target_level.table_infos[end]; + if end > idx && total_file_size + next_sst.file_size > max_target_level_size { break; } - if level_handlers[self.target_level] - .is_pending_compact(&target_level.table_infos[end].sst_id) - { + if level_handlers[self.target_level].is_pending_compact(&next_sst.sst_id) { break; } - total_file_size += sst.file_size; - let key_range = target_level.table_infos[end].key_range.as_ref().unwrap(); + total_file_size += next_sst.file_size; + target_level_files.push(next_sst.clone()); + let key_range = next_sst.key_range.as_ref().unwrap(); if end > 0 && end + 1 < target_level.table_infos.len() { last_overlap_info.update_key_range(key_range); } else if end == 0 { @@ -240,10 +240,36 @@ impl LevelCompactionPicker { &l0.sub_levels, &level_handlers[0], ); + + if !input.sstable_infos.is_empty() { + let mut overlap_info = overlap_strategy.create_overlap_info(); + for (i, ssts) in input.sstable_infos.iter().enumerate().rev() { + if i + 1 != input.sstable_infos.len() { + let r = + overlap_info.check_multiple_overlap(&l0.sub_levels[i].table_infos); + for j in r { + assert!(ssts.iter().any( + |sst| sst.sst_id == l0.sub_levels[i].table_infos[j].sst_id + )); + } + } + for sst in ssts { + overlap_info.update(sst); + } + } + let r = overlap_info.check_multiple_overlap(&target_level.table_infos); + assert!( + idx <= r.start && r.end <= end + 1, + "[{},{}] found: {:?}", + idx, + end, + r + ); + } if input.total_file_size > total_file_size && input.sstable_infos.len() > 1 { min_write_amp_meet = true; exist_small_task = true; - input_levels.push((input, total_file_size, std::mem::take(&mut last_ssts))); + input_levels.push((input, total_file_size, target_level_files.clone())); } } } @@ -605,6 +631,9 @@ pub mod tests { .level0_tier_compact_file_number(2) .compaction_mode(CompactionMode::Range as i32) .level0_sub_level_compact_level_count(1) + .max_bytes_for_level_base(500) + .sub_level_max_compaction_bytes(100) + .max_compaction_bytes(1000) .build(), ); let mut picker = LevelCompactionPicker::new(1, config); diff --git a/src/meta/src/hummock/compaction/picker/include_sst_picker.rs b/src/meta/src/hummock/compaction/picker/include_sst_picker.rs index 969d50109e175..b062f5582e740 100644 --- a/src/meta/src/hummock/compaction/picker/include_sst_picker.rs +++ b/src/meta/src/hummock/compaction/picker/include_sst_picker.rs @@ -64,10 +64,10 @@ impl L0IncludeSstPicker { if range.start >= range.end { break; } - let pending_compact = range - .any(|index| level_handler.is_pending_compact(&level.table_infos[index].sst_id)); - if pending_compact { - break; + for index in range.clone() { + if level_handler.is_pending_compact(&level.table_infos[index].sst_id) { + return ret; + } } let mut overlap = self.overlap_strategy.create_overlap_info(); ret.sstable_infos From cf2b891405f6c03560c257dc016a17891c8b6cc8 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Thu, 10 Aug 2023 23:21:39 +0800 Subject: [PATCH 10/17] test no new strategy Signed-off-by: Little-Wallace --- .../picker/base_level_compaction_picker.rs | 157 +++++++++--------- .../compaction/picker/include_sst_picker.rs | 21 ++- src/meta/src/hummock/manager/mod.rs | 21 ++- .../compaction_group/hummock_version_ext.rs | 10 +- .../src/hummock/sstable/multi_builder.rs | 20 ++- 5 files changed, 130 insertions(+), 99 deletions(-) diff --git a/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs index 3de3c269fa1b1..145853aad5712 100644 --- a/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs @@ -149,9 +149,14 @@ impl LevelCompactionPicker { return None; } + let max_target_level_size = std::cmp::min( + self.config.max_compaction_bytes / 8, + self.config.sub_level_max_compaction_bytes, + ); let mut skip_by_pending = false; let mut input_levels = vec![]; let mut min_write_amp_meet = false; + let mut min_overlap_meet = true; for input in l0_select_tables_vec { let l0_select_tables = input .sstable_infos @@ -184,8 +189,11 @@ impl LevelCompactionPicker { continue; } - if input.total_file_size >= target_level_size { + if target_level_size <= input.total_file_size { min_write_amp_meet = true; + if target_level_size <= max_target_level_size { + min_overlap_meet = true; + } } input_levels.push((input, target_level_size, target_level_ssts)); @@ -198,89 +206,64 @@ impl LevelCompactionPicker { return None; } - let mut exist_small_task = false; - let max_target_level_size = std::cmp::min( - self.config.max_compaction_bytes / 8, - self.config.sub_level_max_compaction_bytes, - ); - for idx in 0..target_level.table_infos.len() { - let mut last_overlap_info = overlap_strategy.create_overlap_info(); - let mut total_file_size = 0; - let mut target_level_files = vec![]; - for end in idx..target_level.table_infos.len() { - let next_sst = &target_level.table_infos[end]; - if end > idx && total_file_size + next_sst.file_size > max_target_level_size { - break; - } - if level_handlers[self.target_level].is_pending_compact(&next_sst.sst_id) { - break; - } - total_file_size += next_sst.file_size; - target_level_files.push(next_sst.clone()); - let key_range = next_sst.key_range.as_ref().unwrap(); - if end > 0 && end + 1 < target_level.table_infos.len() { - last_overlap_info.update_key_range(key_range); - } else if end == 0 { - let mut key_range = key_range.clone(); - key_range.left.clear(); - last_overlap_info.update_key_range(&key_range); - } else { - let mut key_range = key_range.clone(); - key_range.right.clear(); - last_overlap_info.update_key_range(&key_range); - } - - let picker = L0IncludeSstPicker::new( - overlap_strategy.clone(), - self.config.max_compaction_bytes / 2, - self.config.level0_max_compact_file_number / 2, - ); - let input = picker.pick_tables( - last_overlap_info.as_ref(), - &l0.sub_levels, - &level_handlers[0], - ); - - if !input.sstable_infos.is_empty() { - let mut overlap_info = overlap_strategy.create_overlap_info(); - for (i, ssts) in input.sstable_infos.iter().enumerate().rev() { - if i + 1 != input.sstable_infos.len() { - let r = - overlap_info.check_multiple_overlap(&l0.sub_levels[i].table_infos); - for j in r { - assert!(ssts.iter().any( - |sst| sst.sst_id == l0.sub_levels[i].table_infos[j].sst_id - )); - } - } - for sst in ssts { - overlap_info.update(sst); - } + let mut exist_small_task = min_overlap_meet; + if !min_overlap_meet { + for idx in 0..target_level.table_infos.len() { + let mut expand_overlap_info = overlap_strategy.create_overlap_info(); + let mut overlap_info = overlap_strategy.create_overlap_info(); + let mut total_file_size = 0; + let mut target_level_files = vec![]; + for end in idx..target_level.table_infos.len() { + let next_sst = &target_level.table_infos[end]; + if total_file_size + next_sst.file_size > max_target_level_size { + break; + } + if level_handlers[self.target_level].is_pending_compact(&next_sst.sst_id) { + break; } - let r = overlap_info.check_multiple_overlap(&target_level.table_infos); - assert!( - idx <= r.start && r.end <= end + 1, - "[{},{}] found: {:?}", - idx, - end, - r + total_file_size += next_sst.file_size; + target_level_files.push(next_sst.clone()); + let key_range = next_sst.key_range.as_ref().unwrap(); + overlap_info.update_key_range(key_range); + if end > 0 && end + 1 < target_level.table_infos.len() { + expand_overlap_info.update_key_range(key_range); + } else if end == 0 { + let mut key_range = key_range.clone(); + key_range.left.clear(); + expand_overlap_info.update_key_range(&key_range); + } else { + let mut key_range = key_range.clone(); + key_range.right.clear(); + expand_overlap_info.update_key_range(&key_range); + } + + let picker = L0IncludeSstPicker::new( + overlap_strategy.clone(), + self.config.max_compaction_bytes / 2, + self.config.level0_max_compact_file_number / 2, ); - } - if input.total_file_size > total_file_size && input.sstable_infos.len() > 1 { - min_write_amp_meet = true; - exist_small_task = true; - input_levels.push((input, total_file_size, target_level_files.clone())); + let input = picker.pick_tables( + expand_overlap_info.as_ref(), + overlap_info.as_ref(), + &l0.sub_levels, + &level_handlers[0], + ); + + if input.total_file_size > total_file_size && input.sstable_infos.len() > 1 { + min_write_amp_meet = true; + exist_small_task = true; + input_levels.push((input, total_file_size, target_level_files.clone())); + } } } + input_levels.sort_by(|(a, _, _), (b, _, _)| { + b.sstable_infos + .len() + .cmp(&a.sstable_infos.len()) + .then_with(|| a.total_file_count.cmp(&b.total_file_count)) + .then_with(|| a.total_file_size.cmp(&b.total_file_size)) + }); } - input_levels.sort_by_key(|(input, _, _)| input.sstable_infos.len()); - input_levels.sort_by(|(a, _, _), (b, _, _)| { - b.sstable_infos - .len() - .cmp(&a.sstable_infos.len()) - .then_with(|| a.total_file_count.cmp(&b.total_file_count)) - .then_with(|| a.total_file_size.cmp(&b.total_file_size)) - }); if !min_write_amp_meet && strict_check { // If the write-amplification of all candidate task are large, we may hope to wait base @@ -296,7 +279,7 @@ impl LevelCompactionPicker { continue; } - if exist_small_task && target_file_size > self.config.sub_level_max_compaction_bytes { + if exist_small_task && target_file_size > max_target_level_size { continue; } @@ -539,7 +522,15 @@ pub mod tests { #[test] fn test_compact_l0_to_l1() { - let mut picker = create_compaction_picker_for_test(); + let config = Arc::new( + CompactionConfigBuilder::new() + .level0_tier_compact_file_number(2) + .level0_sub_level_compact_level_count(2) + .max_bytes_for_level_base(200) + .sub_level_max_compaction_bytes(150) + .build(), + ); + let mut picker = LevelCompactionPicker::new(1, config); let l0 = generate_level( 0, vec![ @@ -864,7 +855,7 @@ pub mod tests { .max_compaction_bytes(500000) .sub_level_max_compaction_bytes(50000) .max_bytes_for_level_base(500000) - .level0_sub_level_compact_level_count(1) + .level0_sub_level_compact_level_count(2) .build(), ); // Only include sub-level 0 results will violate MAX_WRITE_AMPLIFICATION. @@ -898,11 +889,11 @@ pub mod tests { let ret = picker .pick_compaction(&levels, &levels_handler, &mut local_stats) .unwrap(); - assert_eq!(ret.input_levels[0].table_infos[0].get_sst_id(), 6); assert_eq!( 2, ret.input_levels.iter().filter(|l| l.level_idx == 0).count() ); + assert_eq!(ret.input_levels[0].table_infos[0].get_sst_id(), 6); assert_eq!( 3, ret.input_levels diff --git a/src/meta/src/hummock/compaction/picker/include_sst_picker.rs b/src/meta/src/hummock/compaction/picker/include_sst_picker.rs index b062f5582e740..9bc6e12006772 100644 --- a/src/meta/src/hummock/compaction/picker/include_sst_picker.rs +++ b/src/meta/src/hummock/compaction/picker/include_sst_picker.rs @@ -14,7 +14,7 @@ use std::sync::Arc; -use risingwave_pb::hummock::Level; +use risingwave_pb::hummock::{Level, LevelType}; use crate::hummock::compaction::overlap_strategy::{OverlapInfo, OverlapStrategy}; use crate::hummock::compaction::picker::min_overlap_compaction_picker::SubLevelSstables; @@ -42,21 +42,26 @@ impl L0IncludeSstPicker { pub fn pick_tables( &self, + include_info: &dyn OverlapInfo, overlap_info: &dyn OverlapInfo, sub_levels: &[Level], level_handler: &LevelHandler, ) -> SubLevelSstables { - let mut overlaps: Vec> = vec![]; + let mut include_infos: Vec> = vec![]; let mut ret = SubLevelSstables::default(); for level in sub_levels { - if ret.total_file_size > self.max_compact_size + if level.level_type() == LevelType::Overlapping + || ret.total_file_size > self.max_compact_size || ret.sstable_infos.len() >= MAX_LEVEL_COUNT || ret.total_file_count as u64 > self.max_file_count { break; } - let mut range = overlap_info.check_multiple_include(&level.table_infos); - for overlap in &overlaps { + let overlap_range = overlap_info.check_multiple_overlap(&level.table_infos); + let mut range = include_info.check_multiple_include(&level.table_infos); + range.start = std::cmp::max(range.start, overlap_range.start); + range.end = std::cmp::min(range.end, overlap_range.end); + for overlap in &include_infos { let old_range = overlap.check_multiple_include(&level.table_infos); range.start = std::cmp::max(range.start, old_range.start); range.end = std::cmp::min(range.end, old_range.end); @@ -64,7 +69,7 @@ impl L0IncludeSstPicker { if range.start >= range.end { break; } - for index in range.clone() { + for index in range.start..range.end { if level_handler.is_pending_compact(&level.table_infos[index].sst_id) { return ret; } @@ -72,7 +77,7 @@ impl L0IncludeSstPicker { let mut overlap = self.overlap_strategy.create_overlap_info(); ret.sstable_infos .push(level.table_infos[range.clone()].to_vec()); - for index in range { + for index in range.start..range.end { ret.total_file_count += 1; ret.total_file_size += level.table_infos[index].file_size; let key_range = level.table_infos[index].key_range.as_ref().unwrap(); @@ -89,7 +94,7 @@ impl L0IncludeSstPicker { } overlap.update_key_range(&key_range); } - overlaps.push(overlap); + include_infos.push(overlap); } ret } diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index c2721727f99f1..7e34c32ba0c06 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -38,9 +38,9 @@ use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{ HummockVersionUpdateExt, }; use risingwave_hummock_sdk::{ - version_checkpoint_path, CompactionGroupId, ExtendedSstableInfo, HummockCompactionTaskId, - HummockContextId, HummockEpoch, HummockSstableId, HummockSstableObjectId, HummockVersionId, - SstObjectIdRange, INVALID_VERSION_ID, + append_sstable_info_to_string, version_checkpoint_path, CompactionGroupId, ExtendedSstableInfo, + HummockCompactionTaskId, HummockContextId, HummockEpoch, HummockSstableId, + HummockSstableObjectId, HummockVersionId, SstObjectIdRange, INVALID_VERSION_ID, }; use risingwave_pb::hummock::compact_task::{self, TaskStatus, TaskType}; use risingwave_pb::hummock::group_delta::DeltaType; @@ -892,6 +892,21 @@ where compact_task.sorted_output_ssts = compact_task.input_ssts[0].table_infos.clone(); // this task has been finished and `trivial_move_task` does not need to be schedule. compact_task.set_task_status(TaskStatus::Success); + if compact_task.target_level != 0 { + let mut output_info = String::default(); + for level in &compact_task.input_ssts { + for sst in &level.table_infos { + append_sstable_info_to_string(&mut output_info, sst); + } + } + tracing::info!( + "TrivialMove group-{} target level: {}, ssts: {}", + compaction_group_id, + compact_task.target_level, + output_info + ); + } + self.report_compact_task_impl(&mut compact_task, &mut compaction_guard, None) .await?; tracing::debug!( diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index c64f6f08a99dc..7a1416f873e71 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -1018,7 +1018,15 @@ fn level_insert_ssts(operand: &mut Level, insert_table_infos: Vec) if operand.level_type == LevelType::Overlapping as i32 { operand.level_type = LevelType::Nonoverlapping as i32; } - debug_assert!(can_concat(&operand.table_infos)); + debug_assert!( + can_concat(&operand.table_infos), + "output: {:?}", + operand + .table_infos + .iter() + .map(|sst| sst.sst_id) + .collect_vec() + ); } pub fn object_size_map(version: &HummockVersion) -> HashMap { diff --git a/src/storage/src/hummock/sstable/multi_builder.rs b/src/storage/src/hummock/sstable/multi_builder.rs index d4405807eaf9f..efe266ad94883 100644 --- a/src/storage/src/hummock/sstable/multi_builder.rs +++ b/src/storage/src/hummock/sstable/multi_builder.rs @@ -69,6 +69,7 @@ where task_progress: Option>, last_table_id: u32, + is_target_level_l0_or_lbase: bool, split_by_table: bool, split_weight_by_vnode: u32, /// When vnode of the coming key is greater than `largest_vnode_in_current_partition`, we will @@ -106,6 +107,7 @@ where compactor_metrics, task_progress, last_table_id: 0, + is_target_level_l0_or_lbase, split_by_table, split_weight_by_vnode, largest_vnode_in_current_partition: VirtualNode::MAX.to_index(), @@ -121,6 +123,7 @@ where compactor_metrics: Arc::new(CompactorMetrics::unused()), task_progress: None, last_table_id: 0, + is_target_level_l0_or_lbase: false, split_by_table: false, split_weight_by_vnode: 0, largest_vnode_in_current_partition: VirtualNode::MAX.to_index(), @@ -160,8 +163,7 @@ where value: HummockValue<&[u8]>, is_new_user_key: bool, ) -> HummockResult<()> { - let (switch_builder, _vnode_changed) = - self.check_table_and_vnode_change(&full_key.user_key); + let (switch_builder, vnode_changed) = self.check_table_and_vnode_change(&full_key.user_key); // We use this `need_seal_current` flag to store whether we need to call `seal_current` and // then call `seal_current` later outside the `if let` instead of calling @@ -175,8 +177,18 @@ where let mut last_range_tombstone_epoch = HummockEpoch::MAX; if let Some(builder) = self.current_builder.as_mut() { if is_new_user_key { - need_seal_current = - switch_builder || builder.reach_capacity() || builder.reach_max_key_count(); + if switch_builder { + need_seal_current = true; + } else if builder.reach_capacity() || builder.reach_max_key_count() { + if self.split_weight_by_vnode == 0 + || builder.reach_max_sst_size() + || builder.reach_max_key_count() + { + need_seal_current = true; + } else { + need_seal_current = self.is_target_level_l0_or_lbase && vnode_changed; + } + } } if need_seal_current && let Some(event) = builder.last_range_tombstone() && event.new_epoch != HummockEpoch::MAX { last_range_tombstone_epoch = event.new_epoch; From a46b3987108e85e597a09fe0a214526aab130062 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Fri, 11 Aug 2023 13:28:59 +0800 Subject: [PATCH 11/17] fix test Signed-off-by: Little-Wallace --- .../compaction/picker/base_level_compaction_picker.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs index 145853aad5712..739f8909a4b19 100644 --- a/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs @@ -48,6 +48,12 @@ impl CompactionPicker for LevelCompactionPicker { stats.skip_by_overlapping += 1; return None; } + let is_l0_pending_compact = + level_handlers[0].is_level_all_pending_compact(&l0.sub_levels[0]); + if is_l0_pending_compact { + stats.skip_by_pending_files += 1; + return None; + } let overlap_strategy = create_overlap_strategy(self.config.compaction_mode()); @@ -156,7 +162,7 @@ impl LevelCompactionPicker { let mut skip_by_pending = false; let mut input_levels = vec![]; let mut min_write_amp_meet = false; - let mut min_overlap_meet = true; + let mut min_overlap_meet = false; for input in l0_select_tables_vec { let l0_select_tables = input .sstable_infos From 18783699294a478dc9ac6de1ecf8eefc064eb7cc Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Fri, 11 Aug 2023 13:58:11 +0800 Subject: [PATCH 12/17] fix test Signed-off-by: Little-Wallace --- .../compaction/picker/base_level_compaction_picker.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs index 739f8909a4b19..0fe8d6350f08c 100644 --- a/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs @@ -605,9 +605,8 @@ pub mod tests { let mut local_stats = LocalPickerStatistic::default(); push_table_level0_overlapping(&mut levels, generate_table(8, 1, 199, 233, 3)); - let _ = picker - .pick_compaction(&levels, &levels_handler, &mut local_stats) - .unwrap(); + let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats); + assert!(ret.is_none()); // Don't pick overlapping sub-level 8 levels_handler[0].remove_task(1); From 4452a1c729415a0011a69a3f7b049f01ef971807 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Fri, 11 Aug 2023 15:39:15 +0800 Subject: [PATCH 13/17] fix test Signed-off-by: Little-Wallace --- .../picker/base_level_compaction_picker.rs | 67 ++++++++++--------- 1 file changed, 34 insertions(+), 33 deletions(-) diff --git a/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs index 0fe8d6350f08c..dc72fd4fafce4 100644 --- a/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs @@ -162,7 +162,7 @@ impl LevelCompactionPicker { let mut skip_by_pending = false; let mut input_levels = vec![]; let mut min_write_amp_meet = false; - let mut min_overlap_meet = false; + let mut exist_small_task = false; for input in l0_select_tables_vec { let l0_select_tables = input .sstable_infos @@ -194,6 +194,7 @@ impl LevelCompactionPicker { if target_level_size > self.config.max_compaction_bytes && strict_check { continue; } + let mut min_overlap_meet = false; if target_level_size <= input.total_file_size { min_write_amp_meet = true; @@ -201,27 +202,15 @@ impl LevelCompactionPicker { min_overlap_meet = true; } } - - input_levels.push((input, target_level_size, target_level_ssts)); - } - - if input_levels.is_empty() { - if skip_by_pending { - stats.skip_by_pending_files += 1; - } - return None; - } - - let mut exist_small_task = min_overlap_meet; - if !min_overlap_meet { - for idx in 0..target_level.table_infos.len() { + if !min_overlap_meet { let mut expand_overlap_info = overlap_strategy.create_overlap_info(); let mut overlap_info = overlap_strategy.create_overlap_info(); let mut total_file_size = 0; let mut target_level_files = vec![]; - for end in idx..target_level.table_infos.len() { - let next_sst = &target_level.table_infos[end]; - if total_file_size + next_sst.file_size > max_target_level_size { + for next_sst in &target_level_ssts { + if total_file_size > 0 + && total_file_size + next_sst.file_size > max_target_level_size + { break; } if level_handlers[self.target_level].is_pending_compact(&next_sst.sst_id) { @@ -231,18 +220,22 @@ impl LevelCompactionPicker { target_level_files.push(next_sst.clone()); let key_range = next_sst.key_range.as_ref().unwrap(); overlap_info.update_key_range(key_range); - if end > 0 && end + 1 < target_level.table_infos.len() { - expand_overlap_info.update_key_range(key_range); - } else if end == 0 { + let is_first = + next_sst.sst_id == target_level.table_infos.first().unwrap().sst_id; + let is_end = next_sst.sst_id == target_level.table_infos.last().unwrap().sst_id; + if is_first { let mut key_range = key_range.clone(); key_range.left.clear(); expand_overlap_info.update_key_range(&key_range); - } else { + } else if is_end { let mut key_range = key_range.clone(); key_range.right.clear(); expand_overlap_info.update_key_range(&key_range); + } else { + expand_overlap_info.update_key_range(key_range); } - + } + if !target_level_files.is_empty() { let picker = L0IncludeSstPicker::new( overlap_strategy.clone(), self.config.max_compaction_bytes / 2, @@ -258,18 +251,26 @@ impl LevelCompactionPicker { if input.total_file_size > total_file_size && input.sstable_infos.len() > 1 { min_write_amp_meet = true; exist_small_task = true; - input_levels.push((input, total_file_size, target_level_files.clone())); + input_levels.push((input, total_file_size, target_level_files)); } } } - input_levels.sort_by(|(a, _, _), (b, _, _)| { - b.sstable_infos - .len() - .cmp(&a.sstable_infos.len()) - .then_with(|| a.total_file_count.cmp(&b.total_file_count)) - .then_with(|| a.total_file_size.cmp(&b.total_file_size)) - }); + input_levels.push((input, target_level_size, target_level_ssts)); + } + + if input_levels.is_empty() { + if skip_by_pending { + stats.skip_by_pending_files += 1; + } + return None; } + input_levels.sort_by(|(a, _, _), (b, _, _)| { + b.sstable_infos + .len() + .cmp(&a.sstable_infos.len()) + .then_with(|| a.total_file_count.cmp(&b.total_file_count)) + .then_with(|| a.total_file_size.cmp(&b.total_file_size)) + }); if !min_write_amp_meet && strict_check { // If the write-amplification of all candidate task are large, we may hope to wait base @@ -857,8 +858,8 @@ pub mod tests { // Pick with large max_compaction_bytes results all sub levels included in input. let config = Arc::new( CompactionConfigBuilder::new() - .max_compaction_bytes(500000) - .sub_level_max_compaction_bytes(50000) + .max_compaction_bytes(1000000) + .sub_level_max_compaction_bytes(120000) .max_bytes_for_level_base(500000) .level0_sub_level_compact_level_count(2) .build(), From 3bf0882df60f3f3435f5b8ec05bb036262edc601 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Fri, 11 Aug 2023 17:13:26 +0800 Subject: [PATCH 14/17] fix sst Signed-off-by: Little-Wallace --- .../compaction/picker/base_level_compaction_picker.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs index dc72fd4fafce4..2f0e921c7ad77 100644 --- a/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs @@ -48,12 +48,6 @@ impl CompactionPicker for LevelCompactionPicker { stats.skip_by_overlapping += 1; return None; } - let is_l0_pending_compact = - level_handlers[0].is_level_all_pending_compact(&l0.sub_levels[0]); - if is_l0_pending_compact { - stats.skip_by_pending_files += 1; - return None; - } let overlap_strategy = create_overlap_strategy(self.config.compaction_mode()); From 6fe63c9a84541d852cbe6fede929be51f277c103 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Fri, 11 Aug 2023 18:59:51 +0800 Subject: [PATCH 15/17] fix test Signed-off-by: Little-Wallace --- .../compaction/picker/base_level_compaction_picker.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs index 2f0e921c7ad77..99f053d773f86 100644 --- a/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs @@ -600,8 +600,9 @@ pub mod tests { let mut local_stats = LocalPickerStatistic::default(); push_table_level0_overlapping(&mut levels, generate_table(8, 1, 199, 233, 3)); - let ret = picker.pick_compaction(&levels, &levels_handler, &mut local_stats); - assert!(ret.is_none()); + let _ = picker + .pick_compaction(&levels, &levels_handler, &mut local_stats) + .unwrap(); // Don't pick overlapping sub-level 8 levels_handler[0].remove_task(1); From ad0bb0a6aaa395adb805e01eea55458941fd3874 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Fri, 25 Aug 2023 15:31:53 +0800 Subject: [PATCH 16/17] fix conflict Signed-off-by: Little-Wallace --- src/common/src/config.rs | 12 ------- .../picker/base_level_compaction_picker.rs | 21 +++++------- .../hummock_test/src/compactor_tests.rs | 34 +++++++++---------- 3 files changed, 26 insertions(+), 41 deletions(-) diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 3f2441f371834..1a9edf2a879c3 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -1243,13 +1243,6 @@ pub mod default { pub fn level0_max_compact_file_number() -> u64 { DEFAULT_MAX_COMPACTION_FILE_COUNT } - - pub fn large_group_max_bytes_for_level_base() -> u64 { - DEFAULT_MAX_BYTES_FOR_LEVEL_BASE * 4 - } - pub fn large_group_sub_level_max_compaction_bytes() -> u64 { - DEFAULT_MIN_COMPACTION_BYTES * 3 - } } pub mod s3_objstore_config { @@ -1373,11 +1366,6 @@ pub struct CompactionConfig { pub max_space_reclaim_bytes: u64, #[serde(default = "default::compaction_config::level0_max_compact_file_number")] pub level0_max_compact_file_number: u64, - - #[serde(default = "default::compaction_config::large_group_max_bytes_for_level_base")] - pub large_group_max_bytes_for_level_base: u64, - #[serde(default = "default::compaction_config::large_group_sub_level_max_compaction_bytes")] - pub large_group_sub_level_max_compaction_bytes: u64, } #[cfg(test)] diff --git a/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs index bf47802cafb0d..f6fe2ac0e6515 100644 --- a/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/base_level_compaction_picker.rs @@ -23,7 +23,6 @@ use super::min_overlap_compaction_picker::NonOverlapSubLevelPicker; use super::{CompactionInput, CompactionPicker, LocalPickerStatistic}; use crate::hummock::compaction::create_overlap_strategy; use crate::hummock::compaction::picker::include_sst_picker::L0IncludeSstPicker; -use crate::hummock::compaction::picker::MinOverlappingPicker; use crate::hummock::compaction::picker::TrivialMovePicker; use crate::hummock::level_handler::LevelHandler; @@ -50,10 +49,6 @@ impl CompactionPicker for LevelCompactionPicker { return None; } - let overlap_strategy = create_overlap_strategy(self.config.compaction_mode()); - - let target_level = levels.get_level(self.target_level); - if let Some(ret) = self.pick_base_trivial_move( l0, levels.get_level(self.target_level), @@ -68,7 +63,10 @@ impl CompactionPicker for LevelCompactionPicker { l0, levels.get_level(self.target_level), level_handlers, - ); + stats, + ) { + return Some(ret); + } if let Some(ret) = self.pick_l0_intra(l0, &level_handlers[0], stats) { return Some(ret); @@ -612,6 +610,7 @@ pub mod tests { assert_eq!(ret.input_levels[1].table_infos[0].get_sst_id(), 5); assert_eq!(ret.input_levels[2].table_infos.len(), 2); } + #[test] fn test_selecting_key_range_overlap() { // When picking L0->L1, all L1 files overlapped with selecting_key_range should be picked. @@ -1042,11 +1041,9 @@ pub mod tests { let ret = picker .pick_compaction(&levels, &levels_handler, &mut local_stats) .unwrap(); - // println!("ret.input_levels: {:?}", ret.input_levels); // 1. trivial_move - assert_eq!(2, ret.input_levels.len()); - assert!(ret.input_levels[1].table_infos.is_empty()); - assert_eq!(5, ret.input_levels[0].table_infos[0].sst_id); + assert_eq!(3, ret.input_levels.len()); + assert_eq!(6, ret.input_levels[0].table_infos[0].sst_id); ret.add_pending_task(0, &mut levels_handler); let sst = generate_table(5, 1, 1000, 2000, 1); levels.l0.as_mut().unwrap().sub_levels[0].total_file_size += sst.file_size; @@ -1057,8 +1054,8 @@ pub mod tests { .pick_compaction(&levels, &levels_handler, &mut local_stats) .unwrap(); println!("ret.input_levels: {:?}", ret.input_levels); - assert_eq!(3, ret.input_levels.len()); - assert_eq!(6, ret.input_levels[0].table_infos[0].sst_id); + assert_eq!(2, ret.input_levels.len()); + assert_eq!(5, ret.input_levels[0].table_infos[0].sst_id); } #[test] diff --git a/src/storage/hummock_test/src/compactor_tests.rs b/src/storage/hummock_test/src/compactor_tests.rs index 9175055330959..73b5b597ff439 100644 --- a/src/storage/hummock_test/src/compactor_tests.rs +++ b/src/storage/hummock_test/src/compactor_tests.rs @@ -275,23 +275,23 @@ pub(crate) mod tests { }, )]); compact_task.current_epoch_time = 0; - compact_task.input_ssts = group - .l0 - .as_ref() - .unwrap() - .sub_levels - .iter() - .map(|level| InputLevel { - level_idx: 0, - level_type: level.level_type, - table_infos: level.table_infos.clone(), - }) - .collect(); - compact_task.input_ssts.push(InputLevel { - level_idx: group.levels.last().unwrap().level_idx, - table_infos: group.levels.last().unwrap().table_infos.clone(), - level_type: group.levels.last().unwrap().level_type, - }); + compact_task.input_ssts = group + .l0 + .as_ref() + .unwrap() + .sub_levels + .iter() + .map(|level| InputLevel { + level_idx: 0, + level_type: level.level_type, + table_infos: level.table_infos.clone(), + }) + .collect(); + compact_task.input_ssts.push(InputLevel { + level_idx: group.levels.last().unwrap().level_idx, + table_infos: group.levels.last().unwrap().table_infos.clone(), + level_type: group.levels.last().unwrap().level_type, + }); let (_tx, rx) = tokio::sync::oneshot::channel(); let (mut result_task, task_stats) = compact(Arc::new(compact_ctx.clone()), compact_task.clone(), rx).await; From 0b34f541f576f3d9ad8d114fd6e3a0d7af6bc922 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Fri, 25 Aug 2023 15:59:00 +0800 Subject: [PATCH 17/17] fix config file Signed-off-by: Little-Wallace --- src/config/example.toml | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/config/example.toml b/src/config/example.toml index 10ac4e74fb21f..84b11e4f77569 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -52,8 +52,6 @@ level0_sub_level_compact_level_count = 3 level0_overlapping_sub_level_compact_level_count = 6 max_space_reclaim_bytes = 536870912 level0_max_compact_file_number = 96 -large_group_max_bytes_for_level_base = 2147483648 -large_group_sub_level_max_compaction_bytes = 402653184 [batch] enable_barrier_read = true