diff --git a/src/common/src/config.rs b/src/common/src/config.rs index ed70c2e7d7e34..37680ab7d62eb 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -299,6 +299,9 @@ pub struct MetaConfig { #[serde(default = "default::meta::split_group_size_limit")] pub split_group_size_limit: u64, + #[serde(default = "default::meta::max_group_size")] + pub max_group_size: u64, + #[serde(default = "default::meta::cut_table_size_limit")] pub cut_table_size_limit: u64, @@ -1235,11 +1238,15 @@ pub mod default { } pub fn move_table_size_limit() -> u64 { - 10 * 1024 * 1024 * 1024 // 10GB + 64 * 1024 * 1024 * 1024 // 64GB } pub fn split_group_size_limit() -> u64 { - 64 * 1024 * 1024 * 1024 // 64GB + 256 * 1024 * 1024 * 1024 // 256GB + } + + pub fn max_group_size() -> u64 { + 1024 * 1024 * 1024 * 1024 // 1TB } pub fn partition_vnode_count() -> u32 { diff --git a/src/meta/node/src/lib.rs b/src/meta/node/src/lib.rs index 21cc0c67860b0..ef862ff20cba1 100644 --- a/src/meta/node/src/lib.rs +++ b/src/meta/node/src/lib.rs @@ -339,6 +339,7 @@ pub fn start(opts: MetaNodeOpts) -> Pin + Send>> { .meta .periodic_split_compact_group_interval_sec, split_group_size_limit: config.meta.split_group_size_limit, + max_group_size: config.meta.max_group_size, min_table_split_size: config.meta.move_table_size_limit, table_write_throughput_threshold: config.meta.table_write_throughput_threshold, min_table_split_write_throughput: config.meta.min_table_split_write_throughput, diff --git a/src/meta/src/hummock/manager/timer_task.rs b/src/meta/src/hummock/manager/timer_task.rs index e28a2d2e9fb02..52f28a6b8df99 100644 --- a/src/meta/src/hummock/manager/timer_task.rs +++ b/src/meta/src/hummock/manager/timer_task.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashMap, HashSet, VecDeque}; +use std::collections::{HashMap, VecDeque}; use std::sync::Arc; use std::time::Duration; @@ -21,7 +21,7 @@ use futures::stream::BoxStream; use futures::{FutureExt, StreamExt}; use itertools::Itertools; use risingwave_common::system_param::reader::SystemParamsRead; -use risingwave_hummock_sdk::compaction_group::hummock_version_ext::get_compaction_group_ids; +use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{HummockLevelsExt, get_compaction_group_ids}; use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::CompactionGroupId; use risingwave_pb::hummock::compact_task::{self, TaskStatus}; @@ -37,6 +37,12 @@ use crate::hummock::manager::HISTORY_TABLE_INFO_STATISTIC_TIME; use crate::hummock::metrics_utils::{trigger_lsm_stat, trigger_mv_stat}; use crate::hummock::{HummockManager, TASK_NORMAL}; +pub enum TableAlignRule { + NoOptimization, + SplitToSharedGroup, + SplitToDedicatedCg(u32), +} + impl HummockManager { pub fn hummock_timer_task(hummock_manager: Arc) -> (JoinHandle<()>, Sender<()>) { let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel(); @@ -439,36 +445,106 @@ impl HummockManager { 1, params.checkpoint_frequency() * barrier_interval_ms / 1000, ); - let created_tables = match self.metadata_manager.get_created_table_ids().await { - Ok(created_tables) => created_tables, - Err(err) => { - tracing::warn!(error = %err.as_report(), "failed to fetch created table ids"); - return; - } - }; - let created_tables: HashSet = HashSet::from_iter(created_tables); let table_write_throughput = self.history_table_throughput.read().clone(); let mut group_infos = self.calculate_compaction_group_statistic().await; group_infos.sort_by_key(|group| group.group_size); group_infos.reverse(); - for group in &group_infos { + for group in &mut group_infos { if group.table_statistic.len() == 1 { // no need to handle the separate compaciton group continue; } + let mut split_table_ids = vec![]; for (table_id, table_size) in &group.table_statistic { - self.calculate_table_align_rule( + let rule = self.calculate_table_align_rule( &table_write_throughput, table_id, - table_size, - !created_tables.contains(table_id), + *table_size, checkpoint_secs, group.group_id, group.group_size, - ) - .await; + ); + match rule { + TableAlignRule::NoOptimization => { + continue; + } + + e @ (TableAlignRule::SplitToSharedGroup + | TableAlignRule::SplitToDedicatedCg(_)) => { + split_table_ids.push((*table_id, *table_size, e)); + } + } + } + if split_table_ids.is_empty() || self.check_group_has_stale_sst(group.group_id).await { + continue; + } + let mut last_group_size = 0; + let mut last_group_split_table_ids = vec![]; + for (table_id, table_size, rule) in split_table_ids { + match rule { + TableAlignRule::NoOptimization => { + unreachable!("table align rule can not be NoOptimization"); + } + TableAlignRule::SplitToSharedGroup => { + last_group_split_table_ids.push(table_id); + last_group_size += table_size; + if last_group_size > self.env.opts.split_group_size_limit + && group.group_size + > last_group_size + self.env.opts.split_group_size_limit + { + let ret = self + .move_state_table_to_compaction_group( + group.group_id, + &last_group_split_table_ids, + 0, + ) + .await; + match ret { + Ok((new_group_id, table_vnode_partition_count)) => { + group.group_size -= last_group_size; + tracing::info!("move state table [{}] from group-{} to group-{} success table_vnode_partition_count {:?}", + table_id, group.group_id, new_group_id, table_vnode_partition_count); + } + Err(e) => { + tracing::info!( + error = %e.as_report(), + "failed to move state table [{}] from group-{}", + table_id, + group.group_id, + ) + } + } + last_group_split_table_ids.clear(); + last_group_size = 0; + } + } + TableAlignRule::SplitToDedicatedCg(partition_vnode_count) => { + let ret = self + .move_state_table_to_compaction_group( + group.group_id, + &[table_id], + partition_vnode_count, + ) + .await; + match ret { + Ok((new_group_id, table_vnode_partition_count)) => { + group.group_size -= table_size; + tracing::info!("move state table [{}] from group-{} to group-{} success table_vnode_partition_count {:?}", + table_id, group.group_id, new_group_id, table_vnode_partition_count); + } + Err(e) => { + tracing::info!( + error = %e.as_report(), + "failed to move state table [{}] from group-{}", + table_id, + group.group_id, + ) + } + } + } + } } } } @@ -486,55 +562,40 @@ impl HummockManager { } } - async fn calculate_table_align_rule( + fn calculate_table_align_rule( &self, table_write_throughput: &HashMap>, table_id: &u32, - table_size: &u64, - is_creating_table: bool, + table_size: u64, checkpoint_secs: u64, parent_group_id: u64, group_size: u64, - ) { + ) -> TableAlignRule { let default_group_id: CompactionGroupId = StaticCompactionGroupId::StateDefault.into(); let mv_group_id: CompactionGroupId = StaticCompactionGroupId::MaterializedView.into(); let partition_vnode_count = self.env.opts.partition_vnode_count; let window_size = HISTORY_TABLE_INFO_STATISTIC_TIME / (checkpoint_secs as usize); let mut is_high_write_throughput = false; - let mut is_low_write_throughput = true; if let Some(history) = table_write_throughput.get(table_id) { - if !is_creating_table { - if history.len() >= window_size { - is_high_write_throughput = history.iter().all(|throughput| { - *throughput / checkpoint_secs - > self.env.opts.table_write_throughput_threshold - }); - is_low_write_throughput = history.iter().any(|throughput| { - *throughput / checkpoint_secs - < self.env.opts.min_table_split_write_throughput - }); - } - } else { - // For creating table, relax the checking restrictions to make the data alignment behavior more sensitive. - let sum = history.iter().sum::(); - is_low_write_throughput = sum - < self.env.opts.min_table_split_write_throughput - * history.len() as u64 - * checkpoint_secs; + if history.len() >= window_size { + is_high_write_throughput = history.iter().all(|throughput| { + *throughput / checkpoint_secs > self.env.opts.table_write_throughput_threshold + }); } } - let state_table_size = *table_size; - - // 1. Avoid splitting a creating table - // 2. Avoid splitting a is_low_write_throughput creating table - // 3. Avoid splitting a non-high throughput medium-sized table - if is_creating_table - || (is_low_write_throughput) - || (state_table_size < self.env.opts.min_table_split_size && !is_high_write_throughput) - { - return; + let state_table_size = table_size; + + // 1. Avoid splitting a small table. + // 2. Splitting a high throughput medium-sized table + // 3. Avoid splitting a non-high throughput table + if state_table_size < self.env.opts.min_table_split_size { + return TableAlignRule::NoOptimization; + } else if is_high_write_throughput { + return TableAlignRule::SplitToDedicatedCg(partition_vnode_count); + } else if group_size < self.env.opts.max_group_size { + return TableAlignRule::NoOptimization; } // do not split a large table and a small table because it would increase IOPS @@ -542,31 +603,25 @@ impl HummockManager { if parent_group_id != default_group_id && parent_group_id != mv_group_id { let rest_group_size = group_size - state_table_size; if rest_group_size < state_table_size - && rest_group_size < self.env.opts.min_table_split_size + && rest_group_size < self.env.opts.split_group_size_limit { - return; + return TableAlignRule::NoOptimization; } } - let ret = self - .move_state_table_to_compaction_group( - parent_group_id, - &[*table_id], - partition_vnode_count, - ) - .await; - match ret { - Ok((new_group_id, table_vnode_partition_count)) => { - tracing::info!("move state table [{}] from group-{} to group-{} success table_vnode_partition_count {:?}", table_id, parent_group_id, new_group_id, table_vnode_partition_count); - } - Err(e) => { - tracing::info!( - error = %e.as_report(), - "failed to move state table [{}] from group-{}", - table_id, - parent_group_id, - ) - } + if table_size > self.env.opts.split_group_size_limit { + TableAlignRule::SplitToDedicatedCg(0) + } else { + TableAlignRule::SplitToSharedGroup + } + } + + pub async fn check_group_has_stale_sst(&self, group_id: u64) -> bool { + let versioning_guard = self.versioning.read().await; + let version = &versioning_guard.current_version; + match version.levels.get(&group_id) { + Some(group) => group.check_reclaim_sst_exist(), + None => false, } } } diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index a8e2909a11bf7..fe993be44d7da 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -228,6 +228,8 @@ pub struct MetaOpts { /// The size limit to move a state-table to other group. pub min_table_split_size: u64, + pub max_group_size: u64, + /// Whether config object storage bucket lifecycle to purge stale data. pub do_not_config_object_storage_lifecycle: bool, @@ -317,6 +319,7 @@ impl MetaOpts { periodic_tombstone_reclaim_compaction_interval_sec: 60, periodic_split_compact_group_interval_sec: 60, split_group_size_limit: 5 * 1024 * 1024 * 1024, + max_group_size: 20 * 1024 * 1024 * 1024, min_table_split_size: 2 * 1024 * 1024 * 1024, compact_task_table_size_partition_threshold_low: 128 * 1024 * 1024, compact_task_table_size_partition_threshold_high: 512 * 1024 * 1024, diff --git a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs index d2e71ab82fa2b..969f1e45c0751 100644 --- a/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs +++ b/src/storage/hummock_sdk/src/compaction_group/hummock_version_ext.rs @@ -853,6 +853,32 @@ impl Levels { } delete_sst_ids_set.is_empty() } + + pub fn check_reclaim_sst_exist(&self) -> bool { + let existed_table_ids: HashSet = + HashSet::from_iter(self.member_table_ids.clone().into_iter()); + for level in &self.l0.as_ref().unwrap().sub_levels { + for table in &level.table_infos { + for table_id in &table.table_ids { + if !existed_table_ids.contains(table_id) { + return true; + } + } + } + } + + for level in &self.levels { + for table in &level.table_infos { + for table_id in &table.table_ids { + if !existed_table_ids.contains(table_id) { + return true; + } + } + } + } + + false + } } pub fn build_initial_compaction_group_levels(