From 759b2228fa4f556d281814afc4842cdd8ba9c23c Mon Sep 17 00:00:00 2001 From: Li0k Date: Thu, 13 Jun 2024 15:15:48 +0800 Subject: [PATCH] refactor(storage): refactor compaction group manager txn (#17070) --- src/meta/node/src/server.rs | 2 +- src/meta/src/controller/session_params.rs | 2 +- src/meta/src/hummock/compactor_manager.rs | 2 +- src/meta/src/hummock/manager/compaction.rs | 16 +- .../manager/compaction_group_manager.rs | 486 ++++++++---------- src/meta/src/hummock/manager/context.rs | 4 +- src/meta/src/hummock/manager/mod.rs | 14 +- src/meta/src/hummock/manager/tests.rs | 4 +- src/meta/src/hummock/manager/timer_task.rs | 9 +- src/meta/src/manager/cluster.rs | 2 +- src/meta/src/manager/env.rs | 4 +- src/meta/src/model/catalog.rs | 2 +- src/storage/hummock_sdk/src/version.rs | 4 +- 13 files changed, 255 insertions(+), 296 deletions(-) diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index 8802ebb37951e..e8b738305dce7 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -659,7 +659,7 @@ pub async fn start_service_as_election_leader( ); let health_srv = HealthServiceImpl::new(); let backup_srv = BackupServiceImpl::new(backup_manager); - let telemetry_srv = TelemetryInfoServiceImpl::new(env.meta_store_ref()); + let telemetry_srv = TelemetryInfoServiceImpl::new(env.meta_store()); let system_params_srv = SystemParamsServiceImpl::new(env.system_params_manager_impl_ref()); let session_params_srv = SessionParamsServiceImpl::new(env.session_params_manager_impl_ref()); let serving_srv = diff --git a/src/meta/src/controller/session_params.rs b/src/meta/src/controller/session_params.rs index 4a27967fa2b0a..566170a0ef4d2 100644 --- a/src/meta/src/controller/session_params.rs +++ b/src/meta/src/controller/session_params.rs @@ -151,7 +151,7 @@ mod tests { use sea_orm::QueryFilter; let env = MetaSrvEnv::for_test_with_sql_meta_store().await; - let meta_store = env.meta_store().as_sql(); + let meta_store = env.meta_store_ref().as_sql(); let init_params = SessionConfig::default(); // init system parameter controller as first launch. diff --git a/src/meta/src/hummock/compactor_manager.rs b/src/meta/src/hummock/compactor_manager.rs index 0876f31d211e3..ab0c868f703c4 100644 --- a/src/meta/src/hummock/compactor_manager.rs +++ b/src/meta/src/hummock/compactor_manager.rs @@ -138,7 +138,7 @@ impl CompactorManagerInner { use risingwave_meta_model_v2::compaction_task; use sea_orm::EntityTrait; // Retrieve the existing task assignments from metastore. - let task_assignment: Vec = match env.meta_store() { + let task_assignment: Vec = match env.meta_store_ref() { MetaStoreImpl::Kv(meta_store) => CompactTaskAssignment::list(meta_store).await?, MetaStoreImpl::Sql(sql_meta_store) => compaction_task::Entity::find() .all(&sql_meta_store.conn) diff --git a/src/meta/src/hummock/manager/compaction.rs b/src/meta/src/hummock/manager/compaction.rs index 7a27061c8d373..1969fc8ffc348 100644 --- a/src/meta/src/hummock/manager/compaction.rs +++ b/src/meta/src/hummock/manager/compaction.rs @@ -704,15 +704,15 @@ impl HummockManager { // When the last table of a compaction group is deleted, the compaction group (and its // config) is destroyed as well. Then a compaction task for this group may come later and // cannot find its config. - let group_config = match self - .compaction_group_manager - .read() - .await - .try_get_compaction_group_config(compaction_group_id) - { - Some(config) => config, - None => continue, + let group_config = { + let config_manager = self.compaction_group_manager.read().await; + + match config_manager.try_get_compaction_group_config(compaction_group_id) { + Some(config) => config, + None => continue, + } }; + // StoredIdGenerator already implements ids pre-allocation by ID_PREALLOCATE_INTERVAL. let task_id = next_compaction_task_id(&self.env).await?; diff --git a/src/meta/src/hummock/manager/compaction_group_manager.rs b/src/meta/src/hummock/manager/compaction_group_manager.rs index 063ce4e0a58b1..3a3d596844e95 100644 --- a/src/meta/src/hummock/manager/compaction_group_manager.rs +++ b/src/meta/src/hummock/manager/compaction_group_manager.rs @@ -42,17 +42,15 @@ use crate::hummock::compaction::compaction_config::{ }; use crate::hummock::error::{Error, Result}; use crate::hummock::manager::transaction::HummockVersionTransaction; -use crate::hummock::manager::versioning::{calc_new_write_limits, Versioning}; +use crate::hummock::manager::versioning::Versioning; use crate::hummock::manager::{commit_multi_var, HummockManager}; -use crate::hummock::metrics_utils::{ - remove_compaction_group_in_sst_stat, trigger_write_stop_stats, -}; +use crate::hummock::metrics_utils::remove_compaction_group_in_sst_stat; use crate::hummock::model::CompactionGroup; use crate::hummock::sequence::{next_compaction_group_id, next_sstable_object_id}; use crate::manager::{MetaSrvEnv, MetaStoreImpl}; -use crate::model::{ - BTreeMapEntryTransaction, BTreeMapTransaction, MetadataModel, MetadataModelError, -}; +use crate::model::{BTreeMapTransaction, MetadataModel, MetadataModelError}; + +type CompactionGroupTransaction<'a> = BTreeMapTransaction<'a, CompactionGroupId, CompactionGroup>; impl CompactionGroupManager { pub(super) async fn new(env: &MetaSrvEnv) -> Result { @@ -69,13 +67,38 @@ impl CompactionGroupManager { ) -> Result { let mut compaction_group_manager = CompactionGroupManager { compaction_groups: BTreeMap::new(), - default_config, + default_config: Arc::new(default_config), write_limit: Default::default(), - meta_store_impl: env.meta_store_ref(), }; - compaction_group_manager.init().await?; + + let loaded_compaction_groups: BTreeMap = + match env.meta_store_ref() { + MetaStoreImpl::Kv(meta_store) => CompactionGroup::list(meta_store) + .await? + .into_iter() + .map(|cg| (cg.group_id(), cg)) + .collect(), + MetaStoreImpl::Sql(sql_meta_store) => { + use sea_orm::EntityTrait; + compaction_config::Entity::find() + .all(&sql_meta_store.conn) + .await + .map_err(MetadataModelError::from)? + .into_iter() + .map(|m| (m.compaction_group_id as CompactionGroupId, m.into())) + .collect() + } + }; + + compaction_group_manager.init(loaded_compaction_groups); Ok(compaction_group_manager) } + + fn init(&mut self, loaded_compaction_groups: BTreeMap) { + if !loaded_compaction_groups.is_empty() { + self.compaction_groups = loaded_compaction_groups; + } + } } impl HummockManager { @@ -167,7 +190,10 @@ impl HummockManager { } let mut versioning_guard = self.versioning.write().await; let versioning = versioning_guard.deref_mut(); + let mut compaction_group_manager = self.compaction_group_manager.write().await; let current_version = &versioning.current_version; + let default_config = compaction_group_manager.default_compaction_config(); + let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn(); for (table_id, _) in pairs { if let Some(old_group) = @@ -210,15 +236,17 @@ impl HummockManager { .entry(group_id) .or_default() .group_deltas; - let config = self - .compaction_group_manager - .write() - .await - .get_or_insert_compaction_group_config(group_id) - .await? - .compaction_config - .as_ref() - .clone(); + + let config = + match compaction_groups_txn.try_get_compaction_group_config(group_id) { + Some(config) => config.compaction_config.as_ref().clone(), + None => { + compaction_groups_txn + .create_compaction_groups(group_id, default_config.clone()); + default_config.as_ref().clone() + } + }; + group_deltas.push(GroupDelta { delta_type: Some(DeltaType::GroupConstruct(GroupConstruct { group_config: Some(config), @@ -233,6 +261,7 @@ impl HummockManager { .entry(group_id) .or_default() .group_deltas; + group_deltas.push(GroupDelta { delta_type: Some(DeltaType::GroupMetaChange(GroupMetaChange { table_ids_add: vec![*table_id], @@ -251,7 +280,7 @@ impl HummockManager { .is_none()); } new_version_delta.pre_apply(); - commit_multi_var!(self.meta_store_ref(), version)?; + commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?; Ok(()) } @@ -334,22 +363,23 @@ impl HummockManager { delta_type: Some(DeltaType::GroupDestroy(GroupDestroy {})), }); } - new_version_delta.pre_apply(); - commit_multi_var!(self.meta_store_ref(), version)?; for (group_id, max_level) in groups_to_remove { remove_compaction_group_in_sst_stat(&self.metrics, group_id, max_level); } + new_version_delta.pre_apply(); + // Purge may cause write to meta store. If it hurts performance while holding versioning // lock, consider to make it in batch. - self.compaction_group_manager - .write() - .await - .purge(HashSet::from_iter(get_compaction_group_ids( - &versioning.current_version, - ))) - .await?; + let mut compaction_group_manager = self.compaction_group_manager.write().await; + let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn(); + + compaction_groups_txn.purge(HashSet::from_iter(get_compaction_group_ids( + version.latest_version(), + ))); + commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?; + Ok(()) } @@ -357,21 +387,25 @@ impl HummockManager { &self, compaction_group_ids: &[CompactionGroupId], config_to_update: &[MutableConfig], - ) -> Result> { - let result = self - .compaction_group_manager - .write() - .await - .update_compaction_config(compaction_group_ids, config_to_update) - .await?; + ) -> Result<()> { + { + // Avoid lock conflicts with `try_update_write_limits`` + let mut compaction_group_manager = self.compaction_group_manager.write().await; + let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn(); + compaction_groups_txn + .update_compaction_config(compaction_group_ids, config_to_update)?; + commit_multi_var!(self.meta_store_ref(), compaction_groups_txn)?; + } + if config_to_update .iter() .any(|c| matches!(c, MutableConfig::Level0StopWriteThresholdSubLevelNumber(_))) { + // Update write limits with lock self.try_update_write_limits(compaction_group_ids).await; } - Ok(result) + Ok(()) } /// Gets complete compaction group info. @@ -380,24 +414,25 @@ impl HummockManager { let mut versioning_guard = self.versioning.write().await; let versioning = versioning_guard.deref_mut(); let current_version = &versioning.current_version; - let mut compaction_groups = vec![]; + let mut results = vec![]; + let compaction_group_manager = self.compaction_group_manager.read().await; + for levels in current_version.levels.values() { - let config = self - .compaction_group_manager - .read() - .await + let compaction_config = compaction_group_manager .try_get_compaction_group_config(levels.group_id) .unwrap() - .compaction_config; + .compaction_config + .as_ref() + .clone(); let group = CompactionGroupInfo { id: levels.group_id, parent_id: levels.parent_group_id, member_table_ids: levels.member_table_ids.clone(), - compaction_config: Some(config.as_ref().clone()), + compaction_config: Some(compaction_config), }; - compaction_groups.push(group); + results.push(group); } - compaction_groups + results } /// Splits a compaction group into two. The new one will contain `table_ids`. @@ -415,21 +450,19 @@ impl HummockManager { ) .await?; - Ok(result.0) + Ok(result) } /// move some table to another compaction-group. Create a new compaction group if it does not /// exist. - /// TODO: Move `table_to_partition` in result to compaction group pub async fn move_state_table_to_compaction_group( &self, parent_group_id: CompactionGroupId, table_ids: &[StateTableId], partition_vnode_count: u32, - ) -> Result<(CompactionGroupId, BTreeMap)> { - let mut table_to_partition = BTreeMap::default(); + ) -> Result { if table_ids.is_empty() { - return Ok((parent_group_id, table_to_partition)); + return Ok(parent_group_id); } let table_ids = table_ids.iter().cloned().unique().collect_vec(); let compaction_guard = self.compaction.write().await; @@ -483,7 +516,9 @@ impl HummockManager { .compaction_group_manager .read() .await - .default_compaction_config(); + .default_compaction_config() + .as_ref() + .clone(); config.split_weight_by_vnode = partition_vnode_count; new_version_delta.group_deltas.insert( @@ -520,20 +555,12 @@ impl HummockManager { let (new_compaction_group_id, config) = new_group; { let mut compaction_group_manager = self.compaction_group_manager.write().await; - let insert = BTreeMapEntryTransaction::new_insert( - &mut compaction_group_manager.compaction_groups, - new_compaction_group_id, - CompactionGroup { - group_id: new_compaction_group_id, - compaction_config: Arc::new(config), - }, - ); + let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn(); + compaction_groups_txn + .create_compaction_groups(new_compaction_group_id, Arc::new(config)); + new_version_delta.pre_apply(); - commit_multi_var!(self.meta_store_ref(), version, insert)?; - // Currently, only splitting out a single table_id is supported. - for table_id in table_ids { - table_to_partition.insert(table_id, partition_vnode_count); - } + commit_multi_var!(self.meta_store_ref(), version, compaction_groups_txn)?; } let mut canceled_tasks = vec![]; @@ -571,7 +598,7 @@ impl HummockManager { .with_label_values(&[&parent_group_id.to_string()]) .inc(); - Ok((target_compaction_group_id, table_to_partition)) + Ok(target_compaction_group_id) } pub async fn calculate_compaction_group_statistic(&self) -> Vec { @@ -614,71 +641,11 @@ impl HummockManager { ) -> Result<()> { // 1. Due to version compatibility, we fix some of the configuration of older versions after hummock starts. let current_version = &versioning_guard.current_version; - let all_group_ids = get_compaction_group_ids(current_version); - let mut configs = compaction_group_manager - .get_or_insert_compaction_group_configs(&all_group_ids.collect_vec()) - .await?; - - // We've already lowered the default limit for write limit in PR-12183, and to prevent older clusters from continuing to use the outdated configuration, we've introduced a new logic to rewrite it in a uniform way. - let mut rewrite_cg_ids = vec![]; - let mut restore_cg_to_partition_vnode: HashMap> = - HashMap::default(); - for (cg_id, compaction_group_config) in &mut configs { - // update write limit - let relaxed_default_write_stop_level_count = 1000; - if compaction_group_config - .compaction_config - .level0_sub_level_compact_level_count - == relaxed_default_write_stop_level_count - { - rewrite_cg_ids.push(*cg_id); - } - - if let Some(levels) = current_version.levels.get(cg_id) { - if levels.member_table_ids.len() == 1 { - restore_cg_to_partition_vnode.insert( - *cg_id, - vec![( - levels.member_table_ids[0], - compaction_group_config - .compaction_config - .split_weight_by_vnode, - )] - .into_iter() - .collect(), - ); - } - } - } - - if !rewrite_cg_ids.is_empty() { - tracing::info!("Compaction group {:?} configs rewrite ", rewrite_cg_ids); - - // update meta store - let result = compaction_group_manager - .update_compaction_config( - &rewrite_cg_ids, - &[ - MutableConfig::Level0StopWriteThresholdSubLevelNumber( - risingwave_common::config::default::compaction_config::level0_stop_write_threshold_sub_level_number(), - ), - ], - ) - .await?; - - // update memory - for new_config in result { - configs.insert(new_config.group_id(), new_config); - } - } - - compaction_group_manager.write_limit = - calc_new_write_limits(configs, HashMap::new(), &versioning_guard.current_version); - trigger_write_stop_stats(&self.metrics, &compaction_group_manager.write_limit); - tracing::debug!( - "Hummock stopped write: {:#?}", - compaction_group_manager.write_limit - ); + let all_group_ids = get_compaction_group_ids(current_version).collect_vec(); + let default_config = compaction_group_manager.default_compaction_config(); + let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn(); + compaction_groups_txn.try_create_compaction_groups(&all_group_ids, default_config); + commit_multi_var!(self.meta_store_ref(), compaction_groups_txn)?; Ok(()) } @@ -693,91 +660,15 @@ impl HummockManager { /// 3. move existent table to new compaction group. pub(super) struct CompactionGroupManager { compaction_groups: BTreeMap, - default_config: CompactionConfig, + default_config: Arc, /// Tables that write limit is trigger for. pub write_limit: HashMap, - meta_store_impl: MetaStoreImpl, -} - -// init method -impl CompactionGroupManager { - async fn init(&mut self) -> Result<()> { - let loaded_compaction_groups: BTreeMap = - match &self.meta_store_impl { - MetaStoreImpl::Kv(meta_store) => CompactionGroup::list(meta_store) - .await? - .into_iter() - .map(|cg| (cg.group_id(), cg)) - .collect(), - MetaStoreImpl::Sql(sql_meta_store) => { - use sea_orm::EntityTrait; - compaction_config::Entity::find() - .all(&sql_meta_store.conn) - .await - .map_err(MetadataModelError::from)? - .into_iter() - .map(|m| (m.compaction_group_id as CompactionGroupId, m.into())) - .collect() - } - }; - if !loaded_compaction_groups.is_empty() { - self.compaction_groups = loaded_compaction_groups; - } - Ok(()) - } - - /// Initializes the config for a group. - /// Should only be used by compaction test. - pub(super) async fn init_compaction_config_for_replay( - &mut self, - group_id: CompactionGroupId, - config: CompactionConfig, - ) -> Result<()> { - let insert = BTreeMapEntryTransaction::new_insert( - &mut self.compaction_groups, - group_id, - CompactionGroup { - group_id, - compaction_config: Arc::new(config), - }, - ); - commit_multi_var!(self.meta_store_impl, insert)?; - Ok(()) - } } impl CompactionGroupManager { - /// Gets compaction group config for `compaction_group_id`, inserts default one if missing. - async fn get_or_insert_compaction_group_config( - &mut self, - compaction_group_id: CompactionGroupId, - ) -> Result { - let r = self - .get_or_insert_compaction_group_configs(&[compaction_group_id]) - .await?; - Ok(r.into_values().next().unwrap()) - } - - /// Gets compaction group configs for `compaction_group_ids`, inserts default one if missing. - async fn get_or_insert_compaction_group_configs( - &mut self, - compaction_group_ids: &[CompactionGroupId], - ) -> Result> { - let mut compaction_groups = BTreeMapTransaction::new(&mut self.compaction_groups); - for id in compaction_group_ids { - if compaction_groups.contains_key(id) { - continue; - } - let new_entry = CompactionGroup::new(*id, self.default_config.clone()); - compaction_groups.insert(*id, new_entry); - } - commit_multi_var!(self.meta_store_impl, compaction_groups)?; - - let r = compaction_group_ids - .iter() - .map(|id| (*id, self.compaction_groups[id].clone())) - .collect(); - Ok(r) + /// Starts a transaction to update compaction group configs. + pub fn start_compaction_groups_txn(&mut self) -> CompactionGroupTransaction<'_> { + CompactionGroupTransaction::new(&mut self.compaction_groups) } /// Tries to get compaction group config for `compaction_group_id`. @@ -788,53 +679,10 @@ impl CompactionGroupManager { self.compaction_groups.get(&compaction_group_id).cloned() } - pub(super) fn default_compaction_config(&self) -> CompactionConfig { + /// Tries to get compaction group config for `compaction_group_id`. + pub(super) fn default_compaction_config(&self) -> Arc { self.default_config.clone() } - - pub(super) async fn update_compaction_config( - &mut self, - compaction_group_ids: &[CompactionGroupId], - config_to_update: &[MutableConfig], - ) -> Result> { - let mut compaction_groups = BTreeMapTransaction::new(&mut self.compaction_groups); - let mut result = Vec::with_capacity(compaction_group_ids.len()); - for compaction_group_id in compaction_group_ids.iter().unique() { - let group = compaction_groups.get(compaction_group_id).ok_or_else(|| { - Error::CompactionGroup(format!("invalid group {}", *compaction_group_id)) - })?; - let mut config = group.compaction_config.as_ref().clone(); - update_compaction_config(&mut config, config_to_update); - if let Err(reason) = validate_compaction_config(&config) { - return Err(Error::CompactionGroup(reason)); - } - let mut new_group = group.clone(); - new_group.compaction_config = Arc::new(config); - compaction_groups.insert(*compaction_group_id, new_group.clone()); - result.push(new_group); - } - commit_multi_var!(self.meta_store_impl, compaction_groups)?; - Ok(result) - } - - /// Removes stale group configs. - async fn purge(&mut self, existing_groups: HashSet) -> Result<()> { - let mut compaction_groups = BTreeMapTransaction::new(&mut self.compaction_groups); - let stale_group = compaction_groups - .tree_ref() - .keys() - .cloned() - .filter(|k| !existing_groups.contains(k)) - .collect_vec(); - if stale_group.is_empty() { - return Ok(()); - } - for group in stale_group { - compaction_groups.remove(group); - } - commit_multi_var!(self.meta_store_impl, compaction_groups)?; - Ok(()) - } } fn update_compaction_config(target: &mut CompactionConfig, items: &[MutableConfig]) { @@ -896,6 +744,84 @@ fn update_compaction_config(target: &mut CompactionConfig, items: &[MutableConfi } } +impl<'a> CompactionGroupTransaction<'a> { + /// Inserts compaction group configs if they do not exist. + pub fn try_create_compaction_groups( + &mut self, + compaction_group_ids: &[CompactionGroupId], + config: Arc, + ) -> bool { + let mut trivial = true; + for id in compaction_group_ids { + if self.contains_key(id) { + continue; + } + let new_entry = CompactionGroup::new(*id, config.as_ref().clone()); + self.insert(*id, new_entry); + + trivial = false; + } + + !trivial + } + + pub fn create_compaction_groups( + &mut self, + compaction_group_id: CompactionGroupId, + config: Arc, + ) { + self.try_create_compaction_groups(&[compaction_group_id], config); + } + + /// Tries to get compaction group config for `compaction_group_id`. + pub(super) fn try_get_compaction_group_config( + &self, + compaction_group_id: CompactionGroupId, + ) -> Option<&CompactionGroup> { + self.get(&compaction_group_id) + } + + /// Removes stale group configs. + fn purge(&mut self, existing_groups: HashSet) { + let stale_group = self + .tree_ref() + .keys() + .cloned() + .filter(|k| !existing_groups.contains(k)) + .collect_vec(); + if stale_group.is_empty() { + return; + } + for group in stale_group { + self.remove(group); + } + } + + pub(super) fn update_compaction_config( + &mut self, + compaction_group_ids: &[CompactionGroupId], + config_to_update: &[MutableConfig], + ) -> Result> { + let mut results = HashMap::default(); + for compaction_group_id in compaction_group_ids.iter().unique() { + let group = self.get(compaction_group_id).ok_or_else(|| { + Error::CompactionGroup(format!("invalid group {}", *compaction_group_id)) + })?; + let mut config = group.compaction_config.as_ref().clone(); + update_compaction_config(&mut config, config_to_update); + if let Err(reason) = validate_compaction_config(&config) { + return Err(Error::CompactionGroup(reason)); + } + let mut new_group = group.clone(); + new_group.compaction_config = Arc::new(config); + self.insert(*compaction_group_id, new_group.clone()); + results.insert(new_group.group_id(), new_group); + } + + Ok(results) + } +} + #[cfg(test)] mod tests { use std::collections::BTreeMap; @@ -905,8 +831,11 @@ mod tests { use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig; use risingwave_pb::meta::table_fragments::Fragment; + use crate::hummock::commit_multi_var; + use crate::hummock::error::Result; use crate::hummock::manager::compaction_group_manager::CompactionGroupManager; use crate::hummock::test_utils::setup_compute_env; + use crate::manager::MetaStoreImpl; use crate::model::TableFragments; #[tokio::test] @@ -914,21 +843,46 @@ mod tests { let (env, ..) = setup_compute_env(8080).await; let mut inner = CompactionGroupManager::new(&env).await.unwrap(); assert_eq!(inner.compaction_groups.len(), 2); - inner - .update_compaction_config(&[100, 200], &[]) + + async fn update_compaction_config( + meta: &MetaStoreImpl, + inner: &mut CompactionGroupManager, + cg_ids: &[u64], + config_to_update: &[MutableConfig], + ) -> Result<()> { + let mut compaction_groups_txn = inner.start_compaction_groups_txn(); + compaction_groups_txn.update_compaction_config(cg_ids, config_to_update)?; + commit_multi_var!(meta, compaction_groups_txn) + } + + async fn insert_compaction_group_configs( + meta: &MetaStoreImpl, + inner: &mut CompactionGroupManager, + cg_ids: &[u64], + ) { + let default_config = inner.default_compaction_config(); + let mut compaction_groups_txn = inner.start_compaction_groups_txn(); + if compaction_groups_txn.try_create_compaction_groups(cg_ids, default_config) { + commit_multi_var!(meta, compaction_groups_txn).unwrap(); + } + } + + update_compaction_config(env.meta_store_ref(), &mut inner, &[100, 200], &[]) .await .unwrap_err(); - inner - .get_or_insert_compaction_group_configs(&[100, 200]) - .await - .unwrap(); + insert_compaction_group_configs(env.meta_store_ref(), &mut inner, &[100, 200]).await; assert_eq!(inner.compaction_groups.len(), 4); let mut inner = CompactionGroupManager::new(&env).await.unwrap(); assert_eq!(inner.compaction_groups.len(), 4); - inner - .update_compaction_config(&[100, 200], &[MutableConfig::MaxSubCompaction(123)]) - .await - .unwrap(); + + update_compaction_config( + env.meta_store_ref(), + &mut inner, + &[100, 200], + &[MutableConfig::MaxSubCompaction(123)], + ) + .await + .unwrap(); assert_eq!(inner.compaction_groups.len(), 4); assert_eq!( inner diff --git a/src/meta/src/hummock/manager/context.rs b/src/meta/src/hummock/manager/context.rs index 155dd362c0909..982a94fd5f9db 100644 --- a/src/meta/src/hummock/manager/context.rs +++ b/src/meta/src/hummock/manager/context.rs @@ -99,7 +99,7 @@ impl HummockManager { ) -> Result<()> { let mut context_info = self.context_info.write().await; context_info - .release_contexts(context_ids, self.meta_store_ref()) + .release_contexts(context_ids, self.env.meta_store()) .await?; #[cfg(test)] { @@ -188,7 +188,7 @@ impl HummockManager { } context_info - .release_contexts(&invalid_context_ids, self.meta_store_ref()) + .release_contexts(&invalid_context_ids, self.env.meta_store()) .await?; Ok(invalid_context_ids) diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 47209ddf1fff2..8a49d91a55fc3 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -297,7 +297,7 @@ impl HummockManager { Ok(instance) } - fn meta_store_ref(&self) -> MetaStoreImpl { + fn meta_store_ref(&self) -> &MetaStoreImpl { self.env.meta_store_ref() } @@ -495,22 +495,22 @@ impl HummockManager { ); } + let mut compaction_group_manager = self.compaction_group_manager.write().await; + let mut compaction_groups_txn = compaction_group_manager.start_compaction_groups_txn(); for group in &compaction_groups { let mut pairs = vec![]; for table_id in group.member_table_ids.clone() { pairs.push((table_id as StateTableId, group.id)); } let group_config = group.compaction_config.clone().unwrap(); - self.compaction_group_manager - .write() - .await - .init_compaction_config_for_replay(group.id, group_config) - .await - .unwrap(); + compaction_groups_txn.create_compaction_groups(group.id, Arc::new(group_config)); + self.register_table_ids_for_test(&pairs).await?; tracing::info!("Registered table ids {:?}", pairs); } + commit_multi_var!(self.meta_store_ref(), compaction_groups_txn)?; + // Notify that tables have created for table in table_catalogs { self.env diff --git a/src/meta/src/hummock/manager/tests.rs b/src/meta/src/hummock/manager/tests.rs index 90ba6db61de4e..5b62e5f0694b7 100644 --- a/src/meta/src/hummock/manager/tests.rs +++ b/src/meta/src/hummock/manager/tests.rs @@ -100,7 +100,7 @@ fn get_compaction_group_object_ids( } async fn list_pinned_snapshot_from_meta_store(env: &MetaSrvEnv) -> Vec { - match env.meta_store() { + match env.meta_store_ref() { MetaStoreImpl::Kv(meta_store) => HummockPinnedSnapshot::list(meta_store).await.unwrap(), MetaStoreImpl::Sql(sql_meta_store) => { use risingwave_meta_model_v2::hummock_pinned_snapshot; @@ -117,7 +117,7 @@ async fn list_pinned_snapshot_from_meta_store(env: &MetaSrvEnv) -> Vec Vec { - match env.meta_store() { + match env.meta_store_ref() { MetaStoreImpl::Kv(meta_store) => HummockPinnedVersion::list(meta_store).await.unwrap(), MetaStoreImpl::Sql(sql_meta_store) => { use risingwave_meta_model_v2::hummock_pinned_version; diff --git a/src/meta/src/hummock/manager/timer_task.rs b/src/meta/src/hummock/manager/timer_task.rs index 6b68950a28e3b..b7c8cae4b260e 100644 --- a/src/meta/src/hummock/manager/timer_task.rs +++ b/src/meta/src/hummock/manager/timer_task.rs @@ -565,8 +565,13 @@ impl HummockManager { ) .await; match ret { - Ok((new_group_id, table_vnode_partition_count)) => { - tracing::info!("move state table [{}] from group-{} to group-{} success table_vnode_partition_count {:?}", table_id, parent_group_id, new_group_id, table_vnode_partition_count); + Ok(new_group_id) => { + tracing::info!( + "move state table [{}] from group-{} to group-{} success", + table_id, + parent_group_id, + new_group_id + ); } Err(e) => { tracing::info!( diff --git a/src/meta/src/manager/cluster.rs b/src/meta/src/manager/cluster.rs index 4dd4a257c5b45..876050c36ae6c 100644 --- a/src/meta/src/manager/cluster.rs +++ b/src/meta/src/manager/cluster.rs @@ -568,7 +568,7 @@ impl ClusterManagerCore { pub const MAX_WORKER_REUSABLE_ID_COUNT: usize = 1 << Self::MAX_WORKER_REUSABLE_ID_BITS; async fn new(env: MetaSrvEnv) -> MetaResult { - let meta_store = env.meta_store().as_kv(); + let meta_store = env.meta_store_ref().as_kv(); let mut workers = Worker::list(meta_store).await?; let used_transactional_ids: HashSet<_> = workers diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index 0b0c2ed2a3ced..b623e441c0c22 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -462,11 +462,11 @@ impl MetaSrvEnv { Ok(env) } - pub fn meta_store_ref(&self) -> MetaStoreImpl { + pub fn meta_store(&self) -> MetaStoreImpl { self.meta_store_impl.clone() } - pub fn meta_store(&self) -> &MetaStoreImpl { + pub fn meta_store_ref(&self) -> &MetaStoreImpl { &self.meta_store_impl } diff --git a/src/meta/src/model/catalog.rs b/src/meta/src/model/catalog.rs index c11be01d1a599..8d89080ae2462 100644 --- a/src/meta/src/model/catalog.rs +++ b/src/meta/src/model/catalog.rs @@ -96,7 +96,7 @@ mod tests { #[tokio::test] async fn test_database() -> MetadataModelResult<()> { let env = MetaSrvEnv::for_test().await; - let store = env.meta_store().as_kv(); + let store = env.meta_store_ref().as_kv(); let databases = Database::list(store).await?; assert!(databases.is_empty()); assert!(Database::select(store, &0).await.unwrap().is_none()); diff --git a/src/storage/hummock_sdk/src/version.rs b/src/storage/hummock_sdk/src/version.rs index 51780b1bc7334..b30c1ba0e5191 100644 --- a/src/storage/hummock_sdk/src/version.rs +++ b/src/storage/hummock_sdk/src/version.rs @@ -266,7 +266,7 @@ impl HummockVersion { self.safe_epoch } - pub fn create_init_version(default_compaction_config: CompactionConfig) -> HummockVersion { + pub fn create_init_version(default_compaction_config: Arc) -> HummockVersion { let mut init_version = HummockVersion { id: FIRST_VERSION_ID, levels: Default::default(), @@ -282,7 +282,7 @@ impl HummockVersion { ] { init_version.levels.insert( group_id, - build_initial_compaction_group_levels(group_id, &default_compaction_config), + build_initial_compaction_group_levels(group_id, default_compaction_config.as_ref()), ); } init_version