diff --git a/src/meta/src/hummock/manager/compaction_group_manager.rs b/src/meta/src/hummock/manager/compaction_group_manager.rs index 20bf9407d112..b873a55e8d6c 100644 --- a/src/meta/src/hummock/manager/compaction_group_manager.rs +++ b/src/meta/src/hummock/manager/compaction_group_manager.rs @@ -386,8 +386,9 @@ impl HummockManager { &self, compaction_group_ids: &[CompactionGroupId], config_to_update: &[MutableConfig], - ) -> Result<()> { - self.compaction_group_manager + ) -> Result> { + let result = self + .compaction_group_manager .write() .await .update_compaction_config( @@ -402,7 +403,7 @@ impl HummockManager { { self.try_update_write_limits(compaction_group_ids).await; } - Ok(()) + Ok(result) } /// Gets complete compaction group info. @@ -801,13 +802,14 @@ impl CompactionGroupManager { self.default_config.clone() } - async fn update_compaction_config( + pub async fn update_compaction_config( &mut self, compaction_group_ids: &[CompactionGroupId], config_to_update: &[MutableConfig], meta_store: &S, - ) -> Result<()> { + ) -> Result> { let mut compaction_groups = BTreeMapTransaction::new(&mut self.compaction_groups); + let mut result = Vec::with_capacity(compaction_group_ids.len()); for compaction_group_id in compaction_group_ids.iter().unique() { let group = compaction_groups.get(compaction_group_id).ok_or_else(|| { Error::CompactionGroup(format!("invalid group {}", *compaction_group_id)) @@ -819,14 +821,15 @@ impl CompactionGroupManager { } let mut new_group = group.clone(); new_group.compaction_config = Arc::new(config); - compaction_groups.insert(*compaction_group_id, new_group); + compaction_groups.insert(*compaction_group_id, new_group.clone()); + result.push(new_group); } let mut trx = Transaction::default(); compaction_groups.apply_to_txn(&mut trx)?; meta_store.txn(trx).await?; compaction_groups.commit(); - Ok(()) + Ok(result) } /// Initializes the config for a group. diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index bded17217d36..29dd913df7a3 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -28,6 +28,7 @@ use futures::stream::{BoxStream, FuturesUnordered}; use futures::{FutureExt, StreamExt}; use itertools::Itertools; use parking_lot::Mutex; +use risingwave_common::config::default::compaction_config; use risingwave_common::monitor::rwlock::MonitoredRwLock; use risingwave_common::util::epoch::{Epoch, INVALID_EPOCH}; use risingwave_common::util::{pending_on_none, select_all}; @@ -44,6 +45,7 @@ use risingwave_hummock_sdk::{ }; use risingwave_pb::hummock::compact_task::{self, TaskStatus, TaskType}; use risingwave_pb::hummock::group_delta::DeltaType; +use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config; use risingwave_pb::hummock::subscribe_compaction_event_request::{ Event as RequestEvent, HeartBeat, PullTask, ReportTask, }; @@ -511,7 +513,7 @@ impl HummockManager { versioning_guard.mark_objects_for_deletion(); let all_group_ids = get_compaction_group_ids(&versioning_guard.current_version); - let configs = self + let mut configs = self .compaction_group_manager .write() .await @@ -520,6 +522,46 @@ impl HummockManager { self.env.meta_store(), ) .await?; + + // We've already lowered the default limit for write limit in PR-12183, and to prevent older clusters from continuing to use the outdated configuration, we've introduced a new logic to rewrite it in a uniform way. + let mut rewrite_cg_ids = vec![]; + for (cg_id, compaction_group_config) in &mut configs { + // update write limit + let relaxed_default_write_stop_level_count = 1000; + if compaction_group_config + .compaction_config + .level0_sub_level_compact_level_count + == relaxed_default_write_stop_level_count + { + rewrite_cg_ids.push(*cg_id); + } + } + + if !rewrite_cg_ids.is_empty() { + tracing::info!("Compaction group {:?} configs rewrite ", rewrite_cg_ids); + + // update meta store + let result = self + .compaction_group_manager + .write() + .await + .update_compaction_config( + &rewrite_cg_ids, + &[ + mutable_config::MutableConfig::Level0StopWriteThresholdSubLevelNumber( + compaction_config::level0_stop_write_threshold_sub_level_number(), + ), + ], + self.env.meta_store(), + ) + .await?; + + // update memory + for new_config in result { + configs.insert(new_config.group_id(), new_config); + } + } + versioning_guard.write_limit = calc_new_write_limits(configs, HashMap::new(), &versioning_guard.current_version); trigger_write_stop_stats(&self.metrics, &versioning_guard.write_limit);