diff --git a/proto/hummock.proto b/proto/hummock.proto index b0c0f61512503..de3c88b4e5892 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -307,9 +307,13 @@ message CompactTask { // Identifies whether the task is space_reclaim, if the compact_task_type increases, it will be refactored to enum TaskType task_type = 20; - bool split_by_state_table = 21; + + // Deprecated. use table_vnode_partition instead; + bool split_by_state_table = 21 [deprecated = true]; // Compaction needs to cut the state table every time 1/weight of vnodes in the table have been processed. - uint32 split_weight_by_vnode = 22; + // Deprecated. use table_vnode_partition instead; + uint32 split_weight_by_vnode = 22 [deprecated = true]; + map table_vnode_partition = 23; } message LevelHandler { diff --git a/src/common/src/config.rs b/src/common/src/config.rs index f8e49f06d1cb0..5c8c91db2a3a6 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -266,6 +266,9 @@ pub struct MetaConfig { #[serde(default = "default::meta::split_group_size_limit")] pub split_group_size_limit: u64, + #[serde(default = "default::meta::cut_table_size_limit")] + pub cut_table_size_limit: u64, + #[serde(default, flatten)] pub unrecognized: Unrecognized, @@ -292,6 +295,8 @@ pub struct MetaConfig { #[serde(default)] pub compaction_config: CompactionConfig, + #[serde(default = "default::meta::hybird_partition_vnode_count")] + pub hybird_partition_vnode_count: u32, #[serde(default = "default::meta::event_log_enabled")] pub event_log_enabled: bool, /// Keeps the latest N events per channel. @@ -973,7 +978,7 @@ pub mod default { } pub fn periodic_split_compact_group_interval_sec() -> u64 { - 180 // 3mi + 10 // 10s } pub fn periodic_tombstone_reclaim_compaction_interval_sec() -> u64 { @@ -1004,6 +1009,14 @@ pub mod default { 60 // 1min } + pub fn cut_table_size_limit() -> u64 { + 1024 * 1024 * 1024 // 1GB + } + + pub fn hybird_partition_vnode_count() -> u32 { + 4 + } + pub fn event_log_enabled() -> bool { true } diff --git a/src/config/example.toml b/src/config/example.toml index f0a461c22295c..f142c90eef750 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -34,14 +34,16 @@ backend = "Mem" periodic_space_reclaim_compaction_interval_sec = 3600 periodic_ttl_reclaim_compaction_interval_sec = 1800 periodic_tombstone_reclaim_compaction_interval_sec = 600 -periodic_split_compact_group_interval_sec = 180 +periodic_split_compact_group_interval_sec = 10 move_table_size_limit = 10737418240 split_group_size_limit = 68719476736 +cut_table_size_limit = 1073741824 do_not_config_object_storage_lifecycle = false partition_vnode_count = 64 table_write_throughput_threshold = 16777216 min_table_split_write_throughput = 4194304 compaction_task_max_heartbeat_interval_secs = 60 +hybird_partition_vnode_count = 4 event_log_enabled = true event_log_channel_max_size = 10 diff --git a/src/meta/node/src/lib.rs b/src/meta/node/src/lib.rs index 2f080db760194..a25168d29dc9c 100644 --- a/src/meta/node/src/lib.rs +++ b/src/meta/node/src/lib.rs @@ -297,6 +297,8 @@ pub fn start(opts: MetaNodeOpts) -> Pin + Send>> { .meta .compaction_task_max_heartbeat_interval_secs, compaction_config: Some(config.meta.compaction_config), + cut_table_size_limit: config.meta.cut_table_size_limit, + hybird_partition_vnode_count: config.meta.hybird_partition_vnode_count, event_log_enabled: config.meta.event_log_enabled, event_log_channel_max_size: config.meta.event_log_channel_max_size, advertise_addr: opts.advertise_addr, diff --git a/src/meta/src/hummock/compaction/mod.rs b/src/meta/src/hummock/compaction/mod.rs index aa4d16a4e1dcf..a08ffe26be49c 100644 --- a/src/meta/src/hummock/compaction/mod.rs +++ b/src/meta/src/hummock/compaction/mod.rs @@ -78,7 +78,6 @@ pub struct CompactionTask { pub compression_algorithm: String, pub target_file_size: u64, pub compaction_task_type: compact_task::TaskType, - pub enable_split_by_table: bool, } pub fn create_overlap_strategy(compaction_mode: CompactionMode) -> Arc { @@ -149,8 +148,8 @@ impl CompactStatus { current_epoch_time: 0, target_sub_level_id: ret.input.target_sub_level_id, task_type: ret.compaction_task_type as i32, - split_by_state_table: group.compaction_config.split_by_state_table, - split_weight_by_vnode: group.compaction_config.split_weight_by_vnode, + table_vnode_partition: BTreeMap::default(), + ..Default::default() }; Some(compact_task) } @@ -238,7 +237,6 @@ pub fn create_compaction_task( input, target_file_size, compaction_task_type, - enable_split_by_table: false, } } diff --git a/src/meta/src/hummock/compaction/picker/min_overlap_compaction_picker.rs b/src/meta/src/hummock/compaction/picker/min_overlap_compaction_picker.rs index a44d4a571e04a..66c1a4fbee421 100644 --- a/src/meta/src/hummock/compaction/picker/min_overlap_compaction_picker.rs +++ b/src/meta/src/hummock/compaction/picker/min_overlap_compaction_picker.rs @@ -28,7 +28,6 @@ pub struct MinOverlappingPicker { level: usize, target_level: usize, max_select_bytes: u64, - split_by_table: bool, overlap_strategy: Arc, } @@ -37,14 +36,12 @@ impl MinOverlappingPicker { level: usize, target_level: usize, max_select_bytes: u64, - split_by_table: bool, overlap_strategy: Arc, ) -> MinOverlappingPicker { MinOverlappingPicker { level, target_level, max_select_bytes, - split_by_table, overlap_strategy, } } @@ -66,9 +63,6 @@ impl MinOverlappingPicker { if level_handlers[self.level].is_pending_compact(&table.sst_id) { break; } - if self.split_by_table && table.table_ids != select_tables[left].table_ids { - break; - } if select_file_size > self.max_select_bytes { break; } @@ -405,13 +399,8 @@ pub mod tests { #[test] fn test_compact_l1() { - let mut picker = MinOverlappingPicker::new( - 1, - 2, - 10000, - false, - Arc::new(RangeOverlapStrategy::default()), - ); + let mut picker = + MinOverlappingPicker::new(1, 2, 10000, Arc::new(RangeOverlapStrategy::default())); let levels = vec![ Level { level_idx: 1, @@ -487,13 +476,8 @@ pub mod tests { #[test] fn test_expand_l1_files() { - let mut picker = MinOverlappingPicker::new( - 1, - 2, - 10000, - false, - Arc::new(RangeOverlapStrategy::default()), - ); + let mut picker = + MinOverlappingPicker::new(1, 2, 10000, Arc::new(RangeOverlapStrategy::default())); let levels = vec![ Level { level_idx: 1, @@ -833,7 +817,7 @@ pub mod tests { ]; // no limit let picker = - MinOverlappingPicker::new(2, 3, 1000, false, Arc::new(RangeOverlapStrategy::default())); + MinOverlappingPicker::new(2, 3, 1000, Arc::new(RangeOverlapStrategy::default())); let (select_files, target_files) = picker.pick_tables( &levels[1].table_infos, &levels[2].table_infos, diff --git a/src/meta/src/hummock/compaction/selector/level_selector.rs b/src/meta/src/hummock/compaction/selector/level_selector.rs index 47f8cb0b90ea2..13245b06bc1f5 100644 --- a/src/meta/src/hummock/compaction/selector/level_selector.rs +++ b/src/meta/src/hummock/compaction/selector/level_selector.rs @@ -121,7 +121,6 @@ impl DynamicLevelSelectorCore { picker_info.select_level, picker_info.target_level, self.config.max_bytes_for_level_base, - self.config.split_by_state_table, overlap_strategy, )) } diff --git a/src/meta/src/hummock/manager/compaction_group_manager.rs b/src/meta/src/hummock/manager/compaction_group_manager.rs index f3853c8d08df5..601c3d362afa3 100644 --- a/src/meta/src/hummock/manager/compaction_group_manager.rs +++ b/src/meta/src/hummock/manager/compaction_group_manager.rs @@ -44,7 +44,7 @@ use crate::hummock::error::{Error, Result}; use crate::hummock::manager::{drop_sst, read_lock, HummockManager}; use crate::hummock::metrics_utils::remove_compaction_group_in_sst_stat; use crate::hummock::model::CompactionGroup; -use crate::manager::{IdCategory, MetaSrvEnv}; +use crate::manager::{IdCategory, MetaSrvEnv, TableId}; use crate::model::{ BTreeMapEntryTransaction, BTreeMapTransaction, MetadataModel, TableFragments, ValTransaction, }; @@ -403,6 +403,7 @@ impl HummockManager { { self.try_update_write_limits(compaction_group_ids).await; } + Ok(result) } @@ -440,23 +441,30 @@ impl HummockManager { parent_group_id: CompactionGroupId, table_ids: &[StateTableId], ) -> Result { - self.move_state_table_to_compaction_group(parent_group_id, table_ids, None, false, 0) - .await + let result = self + .move_state_table_to_compaction_group(parent_group_id, table_ids, None, 0) + .await?; + self.group_to_table_vnode_partition + .write() + .insert(result.0, result.1); + + Ok(result.0) } /// move some table to another compaction-group. Create a new compaction group if it does not /// exist. + /// TODO: Move table_to_partition in result to compaction group #[named] pub async fn move_state_table_to_compaction_group( &self, parent_group_id: CompactionGroupId, table_ids: &[StateTableId], target_group_id: Option, - allow_split_by_table: bool, - weight_split_by_vnode: u32, - ) -> Result { + partition_vnode_count: u32, + ) -> Result<(CompactionGroupId, BTreeMap)> { + let mut table_to_partition = BTreeMap::default(); if table_ids.is_empty() { - return Ok(parent_group_id); + return Ok((parent_group_id, table_to_partition)); } let table_ids = table_ids.iter().cloned().unique().collect_vec(); let mut compaction_guard = write_lock!(self, compaction).await; @@ -561,8 +569,7 @@ impl HummockManager { .read() .await .default_compaction_config(); - config.split_by_state_table = allow_split_by_table; - config.split_weight_by_vnode = weight_split_by_vnode; + config.split_weight_by_vnode = partition_vnode_count; new_version_delta.group_deltas.insert( new_compaction_group_id, @@ -615,6 +622,11 @@ impl HummockManager { insert.apply_to_txn(&mut trx).await?; self.env.meta_store().txn(trx).await?; insert.commit(); + + // Currently, only splitting out a single table_id is supported. + for table_id in table_ids { + table_to_partition.insert(table_id, partition_vnode_count); + } } else { self.env.meta_store().txn(trx).await?; } @@ -688,7 +700,7 @@ impl HummockManager { .with_label_values(&[&parent_group_id.to_string()]) .inc(); - Ok(target_compaction_group_id) + Ok((target_compaction_group_id, table_to_partition)) } #[named] diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index fac5a27653e9e..703470c3d6211 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -81,7 +81,8 @@ use crate::hummock::metrics_utils::{ }; use crate::hummock::{CompactorManagerRef, TASK_NORMAL}; use crate::manager::{ - CatalogManagerRef, ClusterManagerRef, FragmentManagerRef, IdCategory, MetaSrvEnv, META_NODE_ID, + CatalogManagerRef, ClusterManagerRef, FragmentManagerRef, IdCategory, MetaSrvEnv, TableId, + META_NODE_ID, }; use crate::model::{ BTreeMapEntryTransaction, BTreeMapTransaction, ClusterId, MetadataModel, ValTransaction, @@ -145,6 +146,19 @@ pub struct HummockManager { // `compaction_state` will record the types of compact tasks that can be triggered in `hummock` // and suggest types with a certain priority. pub compaction_state: CompactionState, + + // Record the partition corresponding to the table in each group (accepting delays) + // The compactor will refer to this structure to determine how to cut the boundaries of sst. + // Currently, we update it in a couple of scenarios + // 1. throughput and size are checked periodically and calculated according to the rules + // 2. A new group is created (split) + // 3. split_weight_by_vnode is modified for an existing group. (not supported yet) + // Tips: + // 1. When table_id does not exist in the current structure, compactor will not cut the boundary + // 2. When partition count <=1, compactor will still use table_id as the cutting boundary of sst + // 3. Modify the special configuration item hybrid_vnode_count = 0 to remove the table_id in hybrid cg and no longer perform alignment cutting. + group_to_table_vnode_partition: + parking_lot::RwLock>>, } pub type HummockManagerRef = Arc; @@ -421,6 +435,7 @@ impl HummockManager { history_table_throughput: parking_lot::RwLock::new(HashMap::default()), compactor_streams_change_tx, compaction_state: CompactionState::new(), + group_to_table_vnode_partition: parking_lot::RwLock::new(HashMap::default()), }; let instance = Arc::new(instance); instance.start_worker(rx).await; @@ -534,61 +549,9 @@ impl HummockManager { versioning_guard.objects_to_delete.clear(); versioning_guard.mark_objects_for_deletion(); - let all_group_ids = get_compaction_group_ids(&versioning_guard.current_version); - let mut configs = self - .compaction_group_manager - .write() - .await - .get_or_insert_compaction_group_configs( - &all_group_ids.collect_vec(), - self.env.meta_store(), - ) + self.initial_compaction_group_config_after_load(versioning_guard) .await?; - // We've already lowered the default limit for write limit in PR-12183, and to prevent older clusters from continuing to use the outdated configuration, we've introduced a new logic to rewrite it in a uniform way. - let mut rewrite_cg_ids = vec![]; - for (cg_id, compaction_group_config) in &mut configs { - // update write limit - let relaxed_default_write_stop_level_count = 1000; - if compaction_group_config - .compaction_config - .level0_sub_level_compact_level_count - == relaxed_default_write_stop_level_count - { - rewrite_cg_ids.push(*cg_id); - } - } - - if !rewrite_cg_ids.is_empty() { - tracing::info!("Compaction group {:?} configs rewrite ", rewrite_cg_ids); - - // update meta store - let result = self - .compaction_group_manager - .write() - .await - .update_compaction_config( - &rewrite_cg_ids, - &[ - mutable_config::MutableConfig::Level0StopWriteThresholdSubLevelNumber( - compaction_config::level0_stop_write_threshold_sub_level_number(), - ), - ], - self.env.meta_store(), - ) - .await?; - - // update memory - for new_config in result { - configs.insert(new_config.group_id(), new_config); - } - } - - versioning_guard.write_limit = - calc_new_write_limits(configs, HashMap::new(), &versioning_guard.current_version); - trigger_write_stop_stats(&self.metrics, &versioning_guard.write_limit); - tracing::debug!("Hummock stopped write: {:#?}", versioning_guard.write_limit); - Ok(()) } @@ -850,6 +813,14 @@ impl HummockManager { // lock in compaction_guard, take out all table_options in advance there may be a // waste of resources here, need to add a more efficient filter in catalog_manager let all_table_id_to_option = self.catalog_manager.get_all_table_options().await; + let mut table_to_vnode_partition = match self + .group_to_table_vnode_partition + .read() + .get(&compaction_group_id) + { + Some(table_to_vnode_partition) => table_to_vnode_partition.clone(), + None => BTreeMap::default(), + }; let mut compaction_guard = write_lock!(self, compaction).await; let compaction = compaction_guard.deref_mut(); @@ -934,12 +905,8 @@ impl HummockManager { }; compact_task.watermark = watermark; - compact_task.existing_table_ids = current_version - .levels - .get(&compaction_group_id) - .unwrap() - .member_table_ids - .clone(); + compact_task.existing_table_ids = member_table_ids.clone(); + let is_trivial_reclaim = CompactStatus::is_trivial_reclaim(&compact_task); let is_trivial_move = CompactStatus::is_trivial_move_task(&compact_task); @@ -1000,6 +967,10 @@ impl HummockManager { compact_task.current_epoch_time = Epoch::now().0; compact_task.compaction_filter_mask = group_config.compaction_config.compaction_filter_mask; + table_to_vnode_partition + .retain(|table_id, _| compact_task.existing_table_ids.contains(table_id)); + + compact_task.table_vnode_partition = table_to_vnode_partition; let mut compact_task_assignment = BTreeMapTransaction::new(&mut compaction.compact_task_assignment); @@ -2203,7 +2174,6 @@ impl HummockManager { )); split_group_trigger_interval .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); - split_group_trigger_interval.reset(); let split_group_trigger = IntervalStream::new(split_group_trigger_interval) .map(|_| HummockTimerEvent::GroupSplit); @@ -2519,83 +2489,77 @@ impl HummockManager { let mut group_infos = self.calculate_compaction_group_statistic().await; group_infos.sort_by_key(|group| group.group_size); group_infos.reverse(); - let default_group_id: CompactionGroupId = StaticCompactionGroupId::StateDefault.into(); - let mv_group_id: CompactionGroupId = StaticCompactionGroupId::MaterializedView.into(); - let partition_vnode_count = self.env.opts.partition_vnode_count; - let window_size = HISTORY_TABLE_INFO_STATISTIC_TIME / (checkpoint_secs as usize); + const SPLIT_BY_TABLE: u32 = 1; + + let mut group_to_table_vnode_partition = self.group_to_table_vnode_partition.read().clone(); for group in &group_infos { if group.table_statistic.len() == 1 { + // no need to handle the separate compaciton group continue; } - for (table_id, table_size) in &group.table_statistic { - if !created_tables.contains(table_id) { - continue; - } - let mut is_high_write_throughput = false; - let mut is_low_write_throughput = true; - if let Some(history) = table_write_throughput.get(table_id) { - if history.len() >= window_size { - is_high_write_throughput = history.iter().all(|throughput| { - *throughput / checkpoint_secs - > self.env.opts.table_write_throughput_threshold - }); - is_low_write_throughput = history.iter().any(|throughput| { - *throughput / checkpoint_secs - < self.env.opts.min_table_split_write_throughput - }); - } - } - let state_table_size = *table_size; - - if is_low_write_throughput { - continue; - } - - if state_table_size < self.env.opts.min_table_split_size - && !is_high_write_throughput - { - continue; - } + let mut table_vnode_partition_mappoing = group_to_table_vnode_partition + .entry(group.group_id) + .or_default(); - let parent_group_id = group.group_id; + for (table_id, table_size) in &group.table_statistic { + let rule = self + .calculate_table_align_rule( + &table_write_throughput, + table_id, + table_size, + !created_tables.contains(table_id), + checkpoint_secs, + group.group_id, + group.group_size, + ) + .await; - // do not split a large table and a small table because it would increase IOPS - // of small table. - if parent_group_id != default_group_id && parent_group_id != mv_group_id { - let rest_group_size = group.group_size - state_table_size; - if rest_group_size < state_table_size - && rest_group_size < self.env.opts.min_table_split_size - { + match rule { + TableAlignRule::NoOptimization => { + table_vnode_partition_mappoing.remove(table_id); continue; } - } - let ret = self - .move_state_table_to_compaction_group( - parent_group_id, - &[*table_id], - None, - false, - partition_vnode_count, - ) - .await; - match ret { - Ok(new_group_id) => { - tracing::info!("move state table [{}] from group-{} to group-{} success, Allow split by table: false", table_id, parent_group_id, new_group_id); - return; + TableAlignRule::SplitByTable(table_id) => { + if self.env.opts.hybird_partition_vnode_count > 0 { + table_vnode_partition_mappoing.insert(table_id, SPLIT_BY_TABLE); + } else { + table_vnode_partition_mappoing.remove(&table_id); + } + } + + TableAlignRule::SplitByVnode((table_id, vnode)) => { + if self.env.opts.hybird_partition_vnode_count > 0 { + table_vnode_partition_mappoing.insert(table_id, vnode); + } else { + table_vnode_partition_mappoing.remove(&table_id); + } } - Err(e) => { - tracing::info!( - "failed to move state table [{}] from group-{} because {:?}", - table_id, - parent_group_id, - e - ) + + TableAlignRule::SplitToDedicatedCg(( + new_group_id, + table_vnode_partition_count, + )) => { + let _ = table_vnode_partition_mappoing; // drop + group_to_table_vnode_partition + .insert(new_group_id, table_vnode_partition_count); + + table_vnode_partition_mappoing = group_to_table_vnode_partition + .entry(group.group_id) + .or_default(); } } } } + + tracing::debug!( + "group_to_table_vnode_partition {:?}", + group_to_table_vnode_partition + ); + + // batch update group_to_table_vnode_partition + *self.group_to_table_vnode_partition.write() = group_to_table_vnode_partition; } #[named] @@ -2878,6 +2842,216 @@ impl HummockManager { None } + + async fn calculate_table_align_rule( + &self, + table_write_throughput: &HashMap>, + table_id: &u32, + table_size: &u64, + is_creating_table: bool, + checkpoint_secs: u64, + parent_group_id: u64, + group_size: u64, + ) -> TableAlignRule { + let default_group_id: CompactionGroupId = StaticCompactionGroupId::StateDefault.into(); + let mv_group_id: CompactionGroupId = StaticCompactionGroupId::MaterializedView.into(); + let partition_vnode_count = self.env.opts.partition_vnode_count; + let hybrid_vnode_count: u32 = self.env.opts.hybird_partition_vnode_count; + let window_size = HISTORY_TABLE_INFO_STATISTIC_TIME / (checkpoint_secs as usize); + + let mut is_high_write_throughput = false; + let mut is_low_write_throughput = true; + if let Some(history) = table_write_throughput.get(table_id) { + if !is_creating_table { + if history.len() >= window_size { + is_high_write_throughput = history.iter().all(|throughput| { + *throughput / checkpoint_secs + > self.env.opts.table_write_throughput_threshold + }); + is_low_write_throughput = history.iter().any(|throughput| { + *throughput / checkpoint_secs + < self.env.opts.min_table_split_write_throughput + }); + } + } else { + // For creating table, relax the checking restrictions to make the data alignment behavior more sensitive. + let sum = history.iter().sum::(); + is_low_write_throughput = sum + < self.env.opts.min_table_split_write_throughput + * history.len() as u64 + * checkpoint_secs; + } + } + + let state_table_size = *table_size; + let result = { + // When in a hybrid compaction group, data from multiple state tables may exist in a single sst, and in order to make the data in the sub level more aligned, a proactive cut is made for the data. + // https://github.com/risingwavelabs/risingwave/issues/13037 + // 1. In some scenario (like backfill), the creating state_table / mv may have high write throughput (creating table ). Therefore, we relax the limit of `is_low_write_throughput` and partition the table with high write throughput by vnode to improve the parallel efficiency of compaction. + // Add: creating table is not allowed to be split + // 2. For table with low throughput, partition by table_id to minimize amplification. + // 3. When the write mode is changed (the above conditions are not met), the default behavior is restored + if !is_low_write_throughput { + TableAlignRule::SplitByVnode((*table_id, hybrid_vnode_count)) + } else if state_table_size > self.env.opts.cut_table_size_limit { + TableAlignRule::SplitByTable(*table_id) + } else { + TableAlignRule::NoOptimization + } + }; + + // 1. Avoid splitting a creating table + // 2. Avoid splitting a is_low_write_throughput creating table + // 3. Avoid splitting a non-high throughput medium-sized table + if is_creating_table + || (is_low_write_throughput) + || (state_table_size < self.env.opts.min_table_split_size && !is_high_write_throughput) + { + return result; + } + + // do not split a large table and a small table because it would increase IOPS + // of small table. + if parent_group_id != default_group_id && parent_group_id != mv_group_id { + let rest_group_size = group_size - state_table_size; + if rest_group_size < state_table_size + && rest_group_size < self.env.opts.min_table_split_size + { + return result; + } + } + + let ret = self + .move_state_table_to_compaction_group( + parent_group_id, + &[*table_id], + None, + partition_vnode_count, + ) + .await; + match ret { + Ok((new_group_id, table_vnode_partition_count)) => { + tracing::info!("move state table [{}] from group-{} to group-{} success, Allow split by table: false", table_id, parent_group_id, new_group_id); + return TableAlignRule::SplitToDedicatedCg(( + new_group_id, + table_vnode_partition_count, + )); + } + Err(e) => { + tracing::info!( + "failed to move state table [{}] from group-{} because {:?}", + table_id, + parent_group_id, + e + ) + } + } + + TableAlignRule::NoOptimization + } + + async fn initial_compaction_group_config_after_load( + &self, + versioning_guard: &mut RwLockWriteGuard<'_, Versioning>, + ) -> Result<()> { + // 1. Due to version compatibility, we fix some of the configuration of older versions after hummock starts. + let current_version = &versioning_guard.current_version; + let all_group_ids = get_compaction_group_ids(current_version); + let mut configs = self + .compaction_group_manager + .write() + .await + .get_or_insert_compaction_group_configs( + &all_group_ids.collect_vec(), + self.env.meta_store(), + ) + .await?; + + // We've already lowered the default limit for write limit in PR-12183, and to prevent older clusters from continuing to use the outdated configuration, we've introduced a new logic to rewrite it in a uniform way. + let mut rewrite_cg_ids = vec![]; + let mut restore_cg_to_partition_vnode: HashMap> = + HashMap::default(); + for (cg_id, compaction_group_config) in &mut configs { + // update write limit + let relaxed_default_write_stop_level_count = 1000; + if compaction_group_config + .compaction_config + .level0_sub_level_compact_level_count + == relaxed_default_write_stop_level_count + { + rewrite_cg_ids.push(*cg_id); + } + + if let Some(levels) = current_version.get_levels().get(cg_id) { + if levels.member_table_ids.len() == 1 { + restore_cg_to_partition_vnode.insert( + *cg_id, + vec![( + levels.member_table_ids[0], + compaction_group_config + .compaction_config + .split_weight_by_vnode, + )] + .into_iter() + .collect(), + ); + } + } + } + + if !rewrite_cg_ids.is_empty() { + tracing::info!("Compaction group {:?} configs rewrite ", rewrite_cg_ids); + + // update meta store + let result = self + .compaction_group_manager + .write() + .await + .update_compaction_config( + &rewrite_cg_ids, + &[ + mutable_config::MutableConfig::Level0StopWriteThresholdSubLevelNumber( + compaction_config::level0_stop_write_threshold_sub_level_number(), + ), + ], + self.env.meta_store(), + ) + .await?; + + // update memory + for new_config in result { + configs.insert(new_config.group_id(), new_config); + } + } + + versioning_guard.write_limit = + calc_new_write_limits(configs, HashMap::new(), &versioning_guard.current_version); + trigger_write_stop_stats(&self.metrics, &versioning_guard.write_limit); + tracing::debug!("Hummock stopped write: {:#?}", versioning_guard.write_limit); + + { + // 2. Restore the memory data structure according to the memory of the compaction group config. + let mut group_to_table_vnode_partition = self.group_to_table_vnode_partition.write(); + for (cg_id, table_vnode_partition) in restore_cg_to_partition_vnode { + group_to_table_vnode_partition.insert(cg_id, table_vnode_partition); + } + } + + Ok(()) + } +} + +// This structure describes how hummock handles sst switching in a compaction group. A better sst cut will result in better data alignment, which in turn will improve the efficiency of the compaction. +// By adopting certain rules, a better sst cut will lead to better data alignment and thus improve the efficiency of the compaction. +pub enum TableAlignRule { + // The table_id is not optimized for alignment. + NoOptimization, + // Move the table_id to a separate compaction group. Currently, the system only supports separate compaction with one table. + SplitToDedicatedCg((CompactionGroupId, BTreeMap)), + // In the current group, partition the table's data according to the granularity of the vnode. + SplitByVnode((TableId, u32)), + // In the current group, partition the table's data at the granularity of the table. + SplitByTable(TableId), } fn drop_sst( diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 1a68461dc96b0..e9076a8a7c591 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -3143,6 +3143,7 @@ impl CatalogManager { .database .tables .values() + .filter(|table| table.get_stream_job_status() != Ok(StreamJobStatus::Creating)) .map(|table| table.id) .collect() } diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index 8507a75839cf2..31a9cad8f316e 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -178,6 +178,19 @@ pub struct MetaOpts { pub compaction_task_max_heartbeat_interval_secs: u64, pub compaction_config: Option, + + /// The size limit to split a state-table to independent sstable. + pub cut_table_size_limit: u64, + + /// hybird compaction group config + /// + /// hybird_partition_vnode_count determines the granularity of vnodes in the hybrid compaction group for SST alignment. + /// When hybird_partition_vnode_count > 0, in hybrid compaction group + /// - Tables with high write throughput will be split at vnode granularity + /// - Tables with high size tables will be split by table granularity + /// When hybird_partition_vnode_count = 0,no longer be special alignment operations for the hybird compaction group + pub hybird_partition_vnode_count: u32, + pub event_log_enabled: bool, pub event_log_channel_max_size: u32, pub advertise_addr: String, @@ -223,6 +236,8 @@ impl MetaOpts { partition_vnode_count: 32, compaction_task_max_heartbeat_interval_secs: 0, compaction_config: None, + cut_table_size_limit: 1024 * 1024 * 1024, + hybird_partition_vnode_count: 4, event_log_enabled: false, event_log_channel_max_size: 1, advertise_addr: "".to_string(), diff --git a/src/storage/benches/bench_compactor.rs b/src/storage/benches/bench_compactor.rs index 16d4f603cc93a..87d90e19f3bf8 100644 --- a/src/storage/benches/bench_compactor.rs +++ b/src/storage/benches/bench_compactor.rs @@ -193,7 +193,6 @@ async fn compact>(iter: I, sstable_store watermark: 0, stats_target_table_ids: None, task_type: compact_task::TaskType::Dynamic, - split_weight_by_vnode: 0, use_block_based_filter: true, ..Default::default() }; diff --git a/src/storage/src/hummock/compactor/compaction_utils.rs b/src/storage/src/hummock/compactor/compaction_utils.rs index d0cd96a831d80..53df3fd9de482 100644 --- a/src/storage/src/hummock/compactor/compaction_utils.rs +++ b/src/storage/src/hummock/compactor/compaction_utils.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashSet; +use std::collections::{BTreeMap, HashSet}; use std::marker::PhantomData; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; @@ -119,9 +119,9 @@ pub struct TaskConfig { pub stats_target_table_ids: Option>, pub task_type: compact_task::TaskType, pub is_target_l0_or_lbase: bool, - pub split_by_table: bool, - pub split_weight_by_vnode: u32, pub use_block_based_filter: bool, + + pub table_vnode_partition: BTreeMap, } pub fn build_multi_compaction_filter(compact_task: &CompactTask) -> MultiCompactionFilter { diff --git a/src/storage/src/hummock/compactor/compactor_runner.rs b/src/storage/src/hummock/compactor/compactor_runner.rs index e9848262da826..d4f7e95d34c05 100644 --- a/src/storage/src/hummock/compactor/compactor_runner.rs +++ b/src/storage/src/hummock/compactor/compactor_runner.rs @@ -105,9 +105,8 @@ impl CompactorRunner { task_type: task.task_type(), is_target_l0_or_lbase: task.target_level == 0 || task.target_level == task.base_level, - split_by_table: task.split_by_state_table, - split_weight_by_vnode: task.split_weight_by_vnode, use_block_based_filter, + table_vnode_partition: task.table_vnode_partition.clone(), }, object_id_getter, ); diff --git a/src/storage/src/hummock/compactor/fast_compactor_runner.rs b/src/storage/src/hummock/compactor/fast_compactor_runner.rs index a46ae043156af..f769699045eb8 100644 --- a/src/storage/src/hummock/compactor/fast_compactor_runner.rs +++ b/src/storage/src/hummock/compactor/fast_compactor_runner.rs @@ -313,8 +313,7 @@ impl CompactorRunner { stats_target_table_ids: Some(HashSet::from_iter(task.existing_table_ids.clone())), task_type: task.task_type(), is_target_l0_or_lbase: task.target_level == 0 || task.target_level == task.base_level, - split_by_table: task.split_by_state_table, - split_weight_by_vnode: task.split_weight_by_vnode, + table_vnode_partition: task.table_vnode_partition.clone(), use_block_based_filter: true, }; let factory = UnifiedSstableWriterFactory::new(context.sstable_store.clone()); @@ -334,8 +333,7 @@ impl CompactorRunner { context.compactor_metrics.clone(), Some(task_progress.clone()), task_config.is_target_l0_or_lbase, - task_config.split_by_table, - task_config.split_weight_by_vnode, + task_config.table_vnode_partition.clone(), ); assert_eq!( task.input_ssts.len(), diff --git a/src/storage/src/hummock/compactor/mod.rs b/src/storage/src/hummock/compactor/mod.rs index 66ac4b5f480c5..156bc55f9e67a 100644 --- a/src/storage/src/hummock/compactor/mod.rs +++ b/src/storage/src/hummock/compactor/mod.rs @@ -266,8 +266,7 @@ impl Compactor { self.context.compactor_metrics.clone(), task_progress.clone(), self.task_config.is_target_l0_or_lbase, - self.task_config.split_by_table, - self.task_config.split_weight_by_vnode, + self.task_config.table_vnode_partition.clone(), ); let compaction_statistics = compact_and_build_sst( &mut sst_builder, diff --git a/src/storage/src/hummock/compactor/shared_buffer_compact.rs b/src/storage/src/hummock/compactor/shared_buffer_compact.rs index b0efa1ac067da..3051d22edab02 100644 --- a/src/storage/src/hummock/compactor/shared_buffer_compact.rs +++ b/src/storage/src/hummock/compactor/shared_buffer_compact.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::ops::Bound; use std::sync::Arc; @@ -229,13 +229,21 @@ async fn compact_shared_buffer( let mut compaction_futures = vec![]; let use_block_based_filter = BlockedXor16FilterBuilder::is_kv_count_too_large(total_key_count); + let table_vnode_partition = if existing_table_ids.len() == 1 { + let table_id = existing_table_ids.iter().next().unwrap(); + vec![(*table_id, split_weight_by_vnode as u32)] + .into_iter() + .collect() + } else { + BTreeMap::default() + }; for (split_index, key_range) in splits.into_iter().enumerate() { let compactor = SharedBufferCompactRunner::new( split_index, key_range, context.clone(), sub_compaction_sstable_size as usize, - split_weight_by_vnode as u32, + table_vnode_partition.clone(), use_block_based_filter, Box::new(sstable_object_id_manager.clone()), ); @@ -479,7 +487,7 @@ impl SharedBufferCompactRunner { key_range: KeyRange, context: CompactorContext, sub_compaction_sstable_size: usize, - split_weight_by_vnode: u32, + table_vnode_partition: BTreeMap, use_block_based_filter: bool, object_id_getter: Box, ) -> Self { @@ -496,8 +504,7 @@ impl SharedBufferCompactRunner { stats_target_table_ids: None, task_type: compact_task::TaskType::SharedBuffer, is_target_l0_or_lbase: true, - split_by_table: false, - split_weight_by_vnode, + table_vnode_partition, use_block_based_filter, }, object_id_getter, diff --git a/src/storage/src/hummock/sstable/multi_builder.rs b/src/storage/src/hummock/sstable/multi_builder.rs index 5b2b914e6a9f7..b90cc2239d428 100644 --- a/src/storage/src/hummock/sstable/multi_builder.rs +++ b/src/storage/src/hummock/sstable/multi_builder.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::BTreeMap; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering::SeqCst; use std::sync::Arc; @@ -72,7 +73,7 @@ where last_table_id: u32, is_target_level_l0_or_lbase: bool, - split_by_table: bool, + table_partition_vnode: BTreeMap, split_weight_by_vnode: u32, /// When vnode of the coming key is greater than `largest_vnode_in_current_partition`, we will /// switch SST. @@ -91,17 +92,8 @@ where compactor_metrics: Arc, task_progress: Option>, is_target_level_l0_or_lbase: bool, - mut split_by_table: bool, - mut split_weight_by_vnode: u32, + table_partition_vnode: BTreeMap, ) -> Self { - if !is_target_level_l0_or_lbase { - split_weight_by_vnode = 0; - } - - if split_weight_by_vnode > 0 { - split_by_table = true; - } - Self { builder_factory, sst_outputs: Vec::new(), @@ -110,8 +102,8 @@ where task_progress, last_table_id: 0, is_target_level_l0_or_lbase, - split_by_table, - split_weight_by_vnode, + table_partition_vnode, + split_weight_by_vnode: 0, largest_vnode_in_current_partition: VirtualNode::MAX.to_index(), last_vnode: 0, } @@ -126,7 +118,7 @@ where task_progress: None, last_table_id: 0, is_target_level_l0_or_lbase: false, - split_by_table: false, + table_partition_vnode: BTreeMap::default(), split_weight_by_vnode: 0, largest_vnode_in_current_partition: VirtualNode::MAX.to_index(), last_vnode: 0, @@ -206,7 +198,11 @@ where if switch_builder { need_seal_current = true; } else if builder.reach_capacity() { - if self.split_weight_by_vnode == 0 || builder.reach_max_sst_size() { + let is_split_table = self + .table_partition_vnode + .contains_key(&full_key.user_key.table_id.table_id()); + + if !is_split_table || builder.reach_max_sst_size() { need_seal_current = true; } else { need_seal_current = self.is_target_level_l0_or_lbase && vnode_changed; @@ -264,15 +260,31 @@ where pub fn check_table_and_vnode_change(&mut self, user_key: &UserKey<&[u8]>) -> (bool, bool) { let mut switch_builder = false; let mut vnode_changed = false; - if self.split_by_table && user_key.table_id.table_id != self.last_table_id { - // table_id change - self.last_table_id = user_key.table_id.table_id; - switch_builder = true; - self.last_vnode = 0; - vnode_changed = true; - if self.split_weight_by_vnode > 1 { - self.largest_vnode_in_current_partition = - VirtualNode::COUNT / (self.split_weight_by_vnode as usize) - 1; + if user_key.table_id.table_id != self.last_table_id { + let new_vnode_partition_count = + self.table_partition_vnode.get(&user_key.table_id.table_id); + + if new_vnode_partition_count.is_some() + || self.table_partition_vnode.contains_key(&self.last_table_id) + { + if new_vnode_partition_count.is_some() { + self.split_weight_by_vnode = *new_vnode_partition_count.unwrap(); + } else { + self.split_weight_by_vnode = 0; + } + + // table_id change + self.last_table_id = user_key.table_id.table_id; + switch_builder = true; + self.last_vnode = 0; + vnode_changed = true; + if self.split_weight_by_vnode > 1 { + self.largest_vnode_in_current_partition = + VirtualNode::COUNT / (self.split_weight_by_vnode as usize) - 1; + } else { + // default + self.largest_vnode_in_current_partition = VirtualNode::MAX.to_index(); + } } } if self.largest_vnode_in_current_partition != VirtualNode::MAX.to_index() { @@ -609,8 +621,7 @@ mod tests { Arc::new(CompactorMetrics::unused()), None, false, - false, - 0, + BTreeMap::default(), ); let full_key = FullKey::for_test( table_id, @@ -707,8 +718,7 @@ mod tests { Arc::new(CompactorMetrics::unused()), None, false, - false, - 0, + BTreeMap::default(), ); del_iter.rewind().await.unwrap(); assert!(is_max_epoch(del_iter.earliest_epoch())); @@ -745,8 +755,7 @@ mod tests { Arc::new(CompactorMetrics::unused()), None, false, - false, - 0, + BTreeMap::default(), ); builder .add_monotonic_delete(MonotonicDeleteEvent { @@ -845,4 +854,76 @@ mod tests { assert_eq!(key_range.right, expected_right); assert!(key_range.right_exclusive); } + + #[tokio::test] + async fn test_check_table_and_vnode_change() { + let block_size = 256; + let table_capacity = 2 * block_size; + let opts = SstableBuilderOptions { + capacity: table_capacity, + block_capacity: block_size, + restart_interval: DEFAULT_RESTART_INTERVAL, + bloom_false_positive: 0.1, + ..Default::default() + }; + + let table_partition_vnode = + BTreeMap::from([(1_u32, 4_u32), (2_u32, 4_u32), (3_u32, 4_u32)]); + + let mut builder = CapacitySplitTableBuilder::new( + LocalTableBuilderFactory::new(1001, mock_sstable_store(), opts), + Arc::new(CompactorMetrics::unused()), + None, + false, + table_partition_vnode, + ); + + let mut table_key = VirtualNode::from_index(0).to_be_bytes().to_vec(); + table_key.extend_from_slice("a".as_bytes()); + + let (switch_builder, vnode_changed) = + builder.check_table_and_vnode_change(&UserKey::for_test(TableId::from(1), &table_key)); + assert!(switch_builder); + assert!(vnode_changed); + + { + let mut table_key = VirtualNode::from_index(62).to_be_bytes().to_vec(); + table_key.extend_from_slice("a".as_bytes()); + let (switch_builder, vnode_changed) = builder + .check_table_and_vnode_change(&UserKey::for_test(TableId::from(1), &table_key)); + assert!(!switch_builder); + assert!(vnode_changed); + + let mut table_key = VirtualNode::from_index(63).to_be_bytes().to_vec(); + table_key.extend_from_slice("a".as_bytes()); + let (switch_builder, vnode_changed) = builder + .check_table_and_vnode_change(&UserKey::for_test(TableId::from(1), &table_key)); + assert!(!switch_builder); + assert!(vnode_changed); + + let mut table_key = VirtualNode::from_index(64).to_be_bytes().to_vec(); + table_key.extend_from_slice("a".as_bytes()); + let (switch_builder, vnode_changed) = builder + .check_table_and_vnode_change(&UserKey::for_test(TableId::from(1), &table_key)); + assert!(switch_builder); + assert!(vnode_changed); + } + + let (switch_builder, vnode_changed) = + builder.check_table_and_vnode_change(&UserKey::for_test(TableId::from(2), &table_key)); + assert!(switch_builder); + assert!(vnode_changed); + let (switch_builder, vnode_changed) = + builder.check_table_and_vnode_change(&UserKey::for_test(TableId::from(3), &table_key)); + assert!(switch_builder); + assert!(vnode_changed); + let (switch_builder, vnode_changed) = + builder.check_table_and_vnode_change(&UserKey::for_test(TableId::from(4), &table_key)); + assert!(switch_builder); + assert!(vnode_changed); + let (switch_builder, vnode_changed) = + builder.check_table_and_vnode_change(&UserKey::for_test(TableId::from(5), &table_key)); + assert!(!switch_builder); + assert!(!vnode_changed); + } }