From 6ef8ba3498cfdf54613c0e26f0ffe57e5f43bef8 Mon Sep 17 00:00:00 2001 From: Li0k Date: Tue, 29 Oct 2024 14:41:16 +0800 Subject: [PATCH] add config --- src/common/src/config.rs | 16 ++++++ src/meta/node/src/lib.rs | 6 ++ src/meta/src/hummock/manager/commit_epoch.rs | 4 +- .../compaction/compaction_group_schedule.rs | 56 ++++++++++++------- src/meta/src/hummock/manager/mod.rs | 5 +- src/meta/src/hummock/manager/timer_task.rs | 33 +++++++++-- src/meta/src/manager/env.rs | 7 +++ 7 files changed, 97 insertions(+), 30 deletions(-) diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 60824468cce0d..8335ef04e2508 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -421,6 +421,14 @@ pub struct MetaConfig { #[serde(default = "default::meta::periodic_scheduling_compaction_group_merge_interval_sec")] pub periodic_scheduling_compaction_group_merge_interval_sec: u64, + + #[serde(default = "default::meta::periodic_table_stat_throuput_reclaim_interval_sec")] + pub periodic_table_stat_throuput_reclaim_interval_sec: u64, + + /// The interval of reclaiming old table throughput statistics. The old statistics with timestamp < now - `table_stat_old_throuput_reclaim_interval_sec` will be removed. + #[serde(default = "default::meta::table_stat_old_throuput_reclaim_interval_sec")] + pub table_stat_old_throuput_reclaim_interval_sec: u64, + #[serde(default)] #[config_doc(nested)] pub meta_store_config: MetaStoreConfig, @@ -1578,6 +1586,14 @@ pub mod default { pub fn periodic_scheduling_compaction_group_merge_interval_sec() -> u64 { 60 * 10 // 10min } + + pub fn periodic_table_stat_throuput_reclaim_interval_sec() -> u64 { + 60 // 1min + } + + pub fn table_stat_old_throuput_reclaim_interval_sec() -> u64 { + 60 * 10 // 10min + } } pub mod server { diff --git a/src/meta/node/src/lib.rs b/src/meta/node/src/lib.rs index 0aa4c06f707e0..503b242df5181 100644 --- a/src/meta/node/src/lib.rs +++ b/src/meta/node/src/lib.rs @@ -398,6 +398,9 @@ pub fn start( periodic_scheduling_compaction_group_merge_interval_sec: config .meta .periodic_scheduling_compaction_group_merge_interval_sec, + periodic_table_stat_throuput_reclaim_interval_sec: config + .meta + .periodic_table_stat_throuput_reclaim_interval_sec, table_high_write_throughput_threshold: config .meta .table_high_write_throughput_threshold, @@ -447,6 +450,9 @@ pub fn start( table_stat_throuput_window_seconds_for_merge: config .meta .table_stat_throuput_window_seconds_for_merge, + table_stat_old_throuput_reclaim_interval_sec: config + .meta + .table_stat_old_throuput_reclaim_interval_sec, object_store_config: config.storage.object_store, max_trivial_move_task_count_per_loop: config .meta diff --git a/src/meta/src/hummock/manager/commit_epoch.rs b/src/meta/src/hummock/manager/commit_epoch.rs index b76a9934f4438..2edc206a56351 100644 --- a/src/meta/src/hummock/manager/commit_epoch.rs +++ b/src/meta/src/hummock/manager/commit_epoch.rs @@ -33,7 +33,7 @@ use risingwave_pb::hummock::compact_task::{self}; use risingwave_pb::hummock::CompactionConfig; use sea_orm::TransactionTrait; -use super::TableThroughputStatistic; +use super::TableWriteThroughputStatistic; use crate::hummock::error::{Error, Result}; use crate::hummock::manager::compaction_group_manager::CompactionGroupManager; use crate::hummock::manager::transaction::{ @@ -359,7 +359,7 @@ impl HummockManager { for (table_id, stat) in table_stats { let throughput = (stat.total_value_size + stat.total_key_size) as u64; let entry = table_infos.entry(table_id).or_default(); - entry.push_back(TableThroughputStatistic { + entry.push_back(TableWriteThroughputStatistic { throughput, timestamp, }); diff --git a/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs b/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs index 6b02d18a53bd8..e67349a905b2e 100644 --- a/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs +++ b/src/meta/src/hummock/manager/compaction/compaction_group_schedule.rs @@ -39,7 +39,7 @@ use crate::hummock::manager::transaction::HummockVersionTransaction; use crate::hummock::manager::{commit_multi_var, HummockManager}; use crate::hummock::metrics_utils::remove_compaction_group_in_sst_stat; use crate::hummock::sequence::{next_compaction_group_id, next_sstable_object_id}; -use crate::hummock::TableThroughputStatistic; +use crate::hummock::TableWriteThroughputStatistic; impl HummockManager { pub async fn merge_compaction_group( @@ -693,7 +693,7 @@ impl HummockManager { /// Split the compaction group if the group is too large or contains high throughput tables. pub async fn try_split_compaction_group( &self, - table_write_throughput: &HashMap>, + table_write_throughput: &HashMap>, checkpoint_secs: u64, group: CompactionGroupStatistic, ) { @@ -724,7 +724,7 @@ impl HummockManager { /// Try to move the high throughput table to a dedicated compaction group. pub async fn try_move_high_throughput_table_to_dedicated_cg( &self, - table_write_throughput: &HashMap>, + table_write_throughput: &HashMap>, table_id: &u32, _table_size: &u64, checkpoint_secs: u64, @@ -831,7 +831,7 @@ impl HummockManager { pub async fn try_merge_compaction_group( &self, - table_write_throughput: &HashMap>, + table_write_throughput: &HashMap>, group: &CompactionGroupStatistic, next_group: &CompactionGroupStatistic, checkpoint_secs: u64, @@ -965,7 +965,7 @@ impl HummockManager { /// Check if the table is high write throughput with the given threshold and ratio. pub fn is_table_high_write_throughput( - table_throughput: &VecDeque, + table_throughput: &VecDeque, sample_size: usize, threshold: u64, high_write_throughput_ratio: f64, @@ -986,7 +986,7 @@ pub fn is_table_high_write_throughput( } pub fn is_table_low_write_throughput( - table_throughput: &VecDeque, + table_throughput: &VecDeque, sample_size: usize, threshold: u64, low_write_throughput_ratio: f64, @@ -1007,34 +1007,48 @@ pub fn is_table_low_write_throughput( } fn check_is_low_write_throughput_compaction_group( - table_write_throughput: &HashMap>, + table_write_throughput: &HashMap>, sample_size: usize, threshold: u64, group: &CompactionGroupStatistic, low_write_throughput_ratio: f64, ) -> bool { - // check table exists - let live_table = group + // check table exists and partitioned table_id into two groups + let (live_table_with_enough_statistic, live_table_without_enough_statistic): ( + Vec, + Vec, + ) = group .table_statistic .keys() .filter(|table_id| table_write_throughput.contains_key(table_id)) - .filter(|table_id| table_write_throughput.get(table_id).unwrap().len() >= sample_size) .cloned() - .collect_vec(); + .partition(|table_id| table_write_throughput.get(table_id).unwrap().len() >= sample_size); + + // if all tables in the group do not have enough statistics, return false + if live_table_with_enough_statistic.is_empty() { + // if all tables in the group do not have statistics, it means the group is in the silent state, return true + if live_table_without_enough_statistic + .into_iter() + .all(|table_id| table_write_throughput.get(&table_id).unwrap().is_empty()) + { + return true; + } - if live_table.is_empty() { return false; } - live_table.into_iter().all(|table_id| { - let table_write_throughput = table_write_throughput.get(&table_id).unwrap(); - is_table_low_write_throughput( - table_write_throughput, - sample_size, - threshold, - low_write_throughput_ratio, - ) - }) + // check if all tables in the group are low write throughput with enough statistics + live_table_with_enough_statistic + .into_iter() + .all(|table_id| { + let table_write_throughput = table_write_throughput.get(&table_id).unwrap(); + is_table_low_write_throughput( + table_write_throughput, + sample_size, + threshold, + low_write_throughput_ratio, + ) + }) } fn check_is_creating_compaction_group( diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index d0bdfca8ee4c9..6b2988280016f 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -98,7 +98,8 @@ pub struct HummockManager { version_checkpoint_path: String, version_archive_dir: String, pause_version_checkpoint: AtomicBool, - history_table_throughput: parking_lot::RwLock>>, + history_table_throughput: + parking_lot::RwLock>>, // for compactor // `compactor_streams_change_tx` is used to pass the mapping from `context_id` to event_stream @@ -497,7 +498,7 @@ async fn write_exclusive_cluster_id( } #[derive(Debug, Clone)] -pub struct TableThroughputStatistic { +pub struct TableWriteThroughputStatistic { pub throughput: u64, pub timestamp: i64, } diff --git a/src/meta/src/hummock/manager/timer_task.rs b/src/meta/src/hummock/manager/timer_task.rs index be6fff990674a..30b4d984469c4 100644 --- a/src/meta/src/hummock/manager/timer_task.rs +++ b/src/meta/src/hummock/manager/timer_task.rs @@ -56,6 +56,7 @@ impl HummockManager { FullGc, GroupScheduleMerge, + ReclaimTableWriteThroughputStatistic, } let mut check_compact_trigger_interval = tokio::time::interval(Duration::from_secs(CHECK_PENDING_TASK_PERIOD_SEC)); @@ -181,6 +182,23 @@ impl HummockManager { triggers.push(Box::pin(group_scheduling_merge_trigger)); } + let periodic_table_stat_throuput_reclaim_interval_sec = hummock_manager + .env + .opts + .periodic_table_stat_throuput_reclaim_interval_sec; + + if periodic_table_stat_throuput_reclaim_interval_sec > 0 { + let mut table_stat_throuput_reclaim_trigger_interval = tokio::time::interval( + Duration::from_secs(periodic_table_stat_throuput_reclaim_interval_sec), + ); + table_stat_throuput_reclaim_trigger_interval + .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + let table_stat_throuput_reclaim_trigger = + IntervalStream::new(table_stat_throuput_reclaim_trigger_interval) + .map(|_| HummockTimerEvent::ReclaimTableWriteThroughputStatistic); + triggers.push(Box::pin(table_stat_throuput_reclaim_trigger)); + } + let event_stream = select_all(triggers); use futures::pin_mut; pin_mut!(event_stream); @@ -213,11 +231,6 @@ impl HummockManager { continue; } - // TODO(li0k): replace with config after PR #18806 - const RECLAIM_TABLE_WRITE_THROUGHPUT_PERIOD_SEC: i64 = 60 * 10; // 10 minutes - hummock_manager.reclaim_table_write_throughput( - RECLAIM_TABLE_WRITE_THROUGHPUT_PERIOD_SEC, - ); hummock_manager.on_handle_schedule_group_split().await; } @@ -448,6 +461,16 @@ impl HummockManager { tracing::info!("Start full GC from meta node."); } } + + HummockTimerEvent::ReclaimTableWriteThroughputStatistic => { + hummock_manager.reclaim_table_write_throughput( + hummock_manager + .env + .opts + .table_stat_old_throuput_reclaim_interval_sec + as _, + ); + } } } } diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index 2d11556f6ef80..82161b5a77669 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -227,6 +227,11 @@ pub struct MetaOpts { /// The window seconds of table throughput statistic history for merge compaction group. pub table_stat_throuput_window_seconds_for_merge: usize, + pub periodic_table_stat_throuput_reclaim_interval_sec: u64, + + /// The interval of reclaiming old table throughput statistics. The old statistics with timestamp < now - `table_stat_old_throuput_reclaim_interval_sec` will be removed. + pub table_stat_old_throuput_reclaim_interval_sec: u64, + /// The configuration of the object store pub object_store_config: ObjectStoreConfig, @@ -324,6 +329,8 @@ impl MetaOpts { table_stat_throuput_window_seconds_for_split: 60, table_stat_throuput_window_seconds_for_merge: 240, periodic_scheduling_compaction_group_merge_interval_sec: 60 * 10, + periodic_table_stat_throuput_reclaim_interval_sec: 60, + table_stat_old_throuput_reclaim_interval_sec: 60 * 10, license_key_path: None, } }