From 525773dac5ec5b414c94b37089d6488158e616f4 Mon Sep 17 00:00:00 2001 From: Li0k Date: Mon, 3 Jun 2024 17:38:18 +0800 Subject: [PATCH] refactor(storage): refactor compaction group manager trx --- src/meta/node/src/server.rs | 2 +- src/meta/src/hummock/compactor_manager.rs | 2 +- .../manager/compaction_group_manager.rs | 339 ++++++++++-------- src/meta/src/hummock/manager/context.rs | 4 +- src/meta/src/hummock/manager/mod.rs | 13 +- src/meta/src/hummock/manager/timer_task.rs | 15 +- src/meta/src/manager/cluster.rs | 2 +- src/meta/src/manager/env.rs | 4 +- 8 files changed, 215 insertions(+), 166 deletions(-) diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index bf5bb72ed731..6dfa0f91faff 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -654,7 +654,7 @@ pub async fn start_service_as_election_leader( ); let health_srv = HealthServiceImpl::new(); let backup_srv = BackupServiceImpl::new(backup_manager); - let telemetry_srv = TelemetryInfoServiceImpl::new(env.meta_store_ref()); + let telemetry_srv = TelemetryInfoServiceImpl::new(env.meta_store()); let system_params_srv = SystemParamsServiceImpl::new(env.system_params_manager_impl_ref()); let session_params_srv = SessionParamsServiceImpl::new(env.session_params_manager_impl_ref()); let serving_srv = diff --git a/src/meta/src/hummock/compactor_manager.rs b/src/meta/src/hummock/compactor_manager.rs index fcbac63817ff..2c5068822898 100644 --- a/src/meta/src/hummock/compactor_manager.rs +++ b/src/meta/src/hummock/compactor_manager.rs @@ -131,7 +131,7 @@ impl CompactorManagerInner { use risingwave_meta_model_v2::compaction_task; use sea_orm::EntityTrait; // Retrieve the existing task assignments from metastore. - let task_assignment: Vec = match env.meta_store() { + let task_assignment: Vec = match env.meta_store_ref() { MetaStoreImpl::Kv(meta_store) => CompactTaskAssignment::list(meta_store).await?, MetaStoreImpl::Sql(sql_meta_store) => compaction_task::Entity::find() .all(&sql_meta_store.conn) diff --git a/src/meta/src/hummock/manager/compaction_group_manager.rs b/src/meta/src/hummock/manager/compaction_group_manager.rs index 320ab37daedb..1ae6d207e8b2 100644 --- a/src/meta/src/hummock/manager/compaction_group_manager.rs +++ b/src/meta/src/hummock/manager/compaction_group_manager.rs @@ -72,9 +72,28 @@ impl CompactionGroupManager { compaction_groups: BTreeMap::new(), default_config, write_limit: Default::default(), - meta_store_impl: env.meta_store_ref(), }; - compaction_group_manager.init().await?; + + let loaded_compaction_groups: BTreeMap = + match env.meta_store_ref() { + MetaStoreImpl::Kv(meta_store) => CompactionGroup::list(meta_store) + .await? + .into_iter() + .map(|cg| (cg.group_id(), cg)) + .collect(), + MetaStoreImpl::Sql(sql_meta_store) => { + use sea_orm::EntityTrait; + compaction_config::Entity::find() + .all(&sql_meta_store.conn) + .await + .map_err(MetadataModelError::from)? + .into_iter() + .map(|m| (m.compaction_group_id as CompactionGroupId, m.into())) + .collect() + } + }; + + compaction_group_manager.init(loaded_compaction_groups); Ok(compaction_group_manager) } } @@ -206,18 +225,38 @@ impl HummockManager { .entry(group_id) .or_default() .group_deltas; - let config = self + + let mut config = self .compaction_group_manager - .write() + .read() .await - .get_or_insert_compaction_group_config(group_id) - .await? - .compaction_config - .as_ref() - .clone(); + .try_get_compaction_group_config(group_id); + + if config.is_none() { + if let Some(insert_trx) = self + .compaction_group_manager + .write() + .await + .insert_not_exist_configs_trx(&[group_id]) + { + commit_multi_var!(self.meta_store_ref(), insert_trx)?; + } else { + unreachable!( + "Failed to insert compaction group config for group {}", + group_id + ); + } + + config = self + .compaction_group_manager + .read() + .await + .try_get_compaction_group_config(group_id); + } + group_deltas.push(GroupDelta { delta_type: Some(DeltaType::GroupConstruct(GroupConstruct { - group_config: Some(config), + group_config: Some(config.unwrap().compaction_config.as_ref().clone()), group_id, ..Default::default() })), @@ -313,27 +352,29 @@ impl HummockManager { delta_type: Some(DeltaType::GroupDestroy(GroupDestroy {})), }); } - new_version_delta.pre_apply(); - commit_multi_var!(self.meta_store_ref(), version)?; for group_id in &groups_to_remove { - let max_level = versioning - .current_version + let max_level = new_version_delta + .latest_version() .get_compaction_group_levels(*group_id) .get_levels() .len(); remove_compaction_group_in_sst_stat(&self.metrics, *group_id, max_level); } + new_version_delta.pre_apply(); + // Purge may cause write to meta store. If it hurts performance while holding versioning // lock, consider to make it in batch. - self.compaction_group_manager - .write() - .await - .purge(HashSet::from_iter(get_compaction_group_ids( - &versioning.current_version, - ))) - .await?; + let mut compaction_group_manager = self.compaction_group_manager.write().await; + if let Some(purge_trx) = compaction_group_manager.purge(HashSet::from_iter( + get_compaction_group_ids(version.latest_version()), + )) { + commit_multi_var!(self.meta_store_ref(), version, purge_trx)?; + } else { + commit_multi_var!(self.meta_store_ref(), version)?; + } + Ok(()) } @@ -350,13 +391,13 @@ impl HummockManager { &self, compaction_group_ids: &[CompactionGroupId], config_to_update: &[MutableConfig], - ) -> Result> { - let result = self - .compaction_group_manager - .write() - .await - .update_compaction_config(compaction_group_ids, config_to_update) - .await?; + ) -> Result<()> { + let mut compaction_group_manager = self.compaction_group_manager.write().await; + let update_trx = compaction_group_manager + .update_compaction_config_trx(compaction_group_ids, config_to_update)?; + + commit_multi_var!(self.meta_store_ref(), update_trx)?; + if config_to_update .iter() .any(|c| matches!(c, MutableConfig::Level0StopWriteThresholdSubLevelNumber(_))) @@ -364,7 +405,7 @@ impl HummockManager { self.try_update_write_limits(compaction_group_ids).await; } - Ok(result) + Ok(()) } /// Gets complete compaction group info. @@ -408,21 +449,19 @@ impl HummockManager { ) .await?; - Ok(result.0) + Ok(result) } /// move some table to another compaction-group. Create a new compaction group if it does not /// exist. - /// TODO: Move `table_to_partition` in result to compaction group pub async fn move_state_table_to_compaction_group( &self, parent_group_id: CompactionGroupId, table_ids: &[StateTableId], partition_vnode_count: u32, - ) -> Result<(CompactionGroupId, BTreeMap)> { - let mut table_to_partition = BTreeMap::default(); + ) -> Result { if table_ids.is_empty() { - return Ok((parent_group_id, table_to_partition)); + return Ok(parent_group_id); } let table_ids = table_ids.iter().cloned().unique().collect_vec(); let compaction_guard = self.compaction.write().await; @@ -513,20 +552,11 @@ impl HummockManager { let (new_compaction_group_id, config) = new_group; { let mut compaction_group_manager = self.compaction_group_manager.write().await; - let insert = BTreeMapEntryTransaction::new_insert( - &mut compaction_group_manager.compaction_groups, - new_compaction_group_id, - CompactionGroup { - group_id: new_compaction_group_id, - compaction_config: Arc::new(config), - }, - ); + let insert_trx = + compaction_group_manager.insert_config_trx(new_compaction_group_id, config.clone()); + new_version_delta.pre_apply(); - commit_multi_var!(self.meta_store_ref(), version, insert)?; - // Currently, only splitting out a single table_id is supported. - for table_id in table_ids { - table_to_partition.insert(table_id, partition_vnode_count); - } + commit_multi_var!(self.meta_store_ref(), version, insert_trx)?; } let mut canceled_tasks = vec![]; @@ -564,7 +594,7 @@ impl HummockManager { .with_label_values(&[&parent_group_id.to_string()]) .inc(); - Ok((target_compaction_group_id, table_to_partition)) + Ok(target_compaction_group_id) } pub async fn calculate_compaction_group_statistic(&self) -> Vec { @@ -607,64 +637,49 @@ impl HummockManager { ) -> Result<()> { // 1. Due to version compatibility, we fix some of the configuration of older versions after hummock starts. let current_version = &versioning_guard.current_version; - let all_group_ids = get_compaction_group_ids(current_version); - let mut configs = compaction_group_manager - .get_or_insert_compaction_group_configs(&all_group_ids.collect_vec()) - .await?; + let all_group_ids = get_compaction_group_ids(current_version).collect_vec(); + let compaction_group = + compaction_group_manager.try_get_compaction_group_configs(&all_group_ids); // We've already lowered the default limit for write limit in PR-12183, and to prevent older clusters from continuing to use the outdated configuration, we've introduced a new logic to rewrite it in a uniform way. let mut rewrite_cg_ids = vec![]; - let mut restore_cg_to_partition_vnode: HashMap> = - HashMap::default(); - for (cg_id, compaction_group_config) in &mut configs { + for compaction_group_config in compaction_group { // update write limit + let cg_id = compaction_group_config.group_id; let relaxed_default_write_stop_level_count = 1000; if compaction_group_config .compaction_config .level0_sub_level_compact_level_count == relaxed_default_write_stop_level_count { - rewrite_cg_ids.push(*cg_id); - } - - if let Some(levels) = current_version.levels.get(cg_id) { - if levels.member_table_ids.len() == 1 { - restore_cg_to_partition_vnode.insert( - *cg_id, - vec![( - levels.member_table_ids[0], - compaction_group_config - .compaction_config - .split_weight_by_vnode, - )] - .into_iter() - .collect(), - ); - } + rewrite_cg_ids.push(cg_id); } } if !rewrite_cg_ids.is_empty() { tracing::info!("Compaction group {:?} configs rewrite ", rewrite_cg_ids); - // update meta store - let result = compaction_group_manager - .update_compaction_config( + { + let update_trx = compaction_group_manager + .update_compaction_config_trx( &rewrite_cg_ids, &[ MutableConfig::Level0StopWriteThresholdSubLevelNumber( risingwave_common::config::default::compaction_config::level0_stop_write_threshold_sub_level_number(), ), ], - ) - .await?; + )?; - // update memory - for new_config in result { - configs.insert(new_config.group_id(), new_config); - } + commit_multi_var!(self.meta_store_ref(), update_trx)?; + }; } + let configs = compaction_group_manager + .try_get_compaction_group_configs(&all_group_ids) + .into_iter() + .map(|group| (group.group_id, group)) + .collect(); + compaction_group_manager.write_limit = calc_new_write_limits(configs, HashMap::new(), &versioning_guard.current_version); trigger_write_stop_stats(&self.metrics, &compaction_group_manager.write_limit); @@ -689,88 +704,70 @@ pub(super) struct CompactionGroupManager { default_config: CompactionConfig, /// Tables that write limit is trigger for. pub write_limit: HashMap, - meta_store_impl: MetaStoreImpl, } // init method impl CompactionGroupManager { - async fn init(&mut self) -> Result<()> { - let loaded_compaction_groups: BTreeMap = - match &self.meta_store_impl { - MetaStoreImpl::Kv(meta_store) => CompactionGroup::list(meta_store) - .await? - .into_iter() - .map(|cg| (cg.group_id(), cg)) - .collect(), - MetaStoreImpl::Sql(sql_meta_store) => { - use sea_orm::EntityTrait; - compaction_config::Entity::find() - .all(&sql_meta_store.conn) - .await - .map_err(MetadataModelError::from)? - .into_iter() - .map(|m| (m.compaction_group_id as CompactionGroupId, m.into())) - .collect() - } - }; + fn init(&mut self, loaded_compaction_groups: BTreeMap) { if !loaded_compaction_groups.is_empty() { self.compaction_groups = loaded_compaction_groups; } - Ok(()) } /// Initializes the config for a group. /// Should only be used by compaction test. - pub(super) async fn init_compaction_config_for_replay( + pub(super) fn init_compaction_config_for_replay_trx( &mut self, group_id: CompactionGroupId, config: CompactionConfig, - ) -> Result<()> { - let insert = BTreeMapEntryTransaction::new_insert( + ) -> BTreeMapEntryTransaction<'_, u64, CompactionGroup> { + BTreeMapEntryTransaction::new_insert( &mut self.compaction_groups, group_id, CompactionGroup { group_id, compaction_config: Arc::new(config), }, - ); - commit_multi_var!(self.meta_store_impl, insert)?; - Ok(()) + ) } } impl CompactionGroupManager { - /// Gets compaction group config for `compaction_group_id`, inserts default one if missing. - async fn get_or_insert_compaction_group_config( - &mut self, - compaction_group_id: CompactionGroupId, - ) -> Result { - let r = self - .get_or_insert_compaction_group_configs(&[compaction_group_id]) - .await?; - Ok(r.into_values().next().unwrap()) - } - - /// Gets compaction group configs for `compaction_group_ids`, inserts default one if missing. - async fn get_or_insert_compaction_group_configs( + /// Inserts compaction group configs if they do not exist. + pub fn insert_not_exist_configs_trx( &mut self, compaction_group_ids: &[CompactionGroupId], - ) -> Result> { + ) -> Option> { let mut compaction_groups = BTreeMapTransaction::new(&mut self.compaction_groups); + let mut is_empty = true; for id in compaction_group_ids { if compaction_groups.contains_key(id) { continue; } let new_entry = CompactionGroup::new(*id, self.default_config.clone()); compaction_groups.insert(*id, new_entry); + + if is_empty { + is_empty = false; + } } - commit_multi_var!(self.meta_store_impl, compaction_groups)?; - let r = compaction_group_ids - .iter() - .map(|id| (*id, self.compaction_groups[id].clone())) - .collect(); - Ok(r) + if is_empty { + return None; + } + + Some(compaction_groups) + } + + pub fn insert_config_trx( + &mut self, + compaction_group_id: CompactionGroupId, + config: CompactionConfig, + ) -> BTreeMapTransaction<'_, u64, CompactionGroup> { + let mut compaction_groups = BTreeMapTransaction::new(&mut self.compaction_groups); + let new_entry = CompactionGroup::new(compaction_group_id, config); + compaction_groups.insert(compaction_group_id, new_entry); + compaction_groups } /// Tries to get compaction group config for `compaction_group_id`. @@ -781,17 +778,32 @@ impl CompactionGroupManager { self.compaction_groups.get(&compaction_group_id).cloned() } + /// Tries to get compaction group config for `compaction_group_id`. + pub(super) fn try_get_compaction_group_configs( + &self, + compaction_group_ids: &Vec, + ) -> Vec { + let mut result = Vec::with_capacity(compaction_group_ids.len()); + + compaction_group_ids.iter().for_each(|id| { + if let Some(group) = self.compaction_groups.get(id) { + result.push(group.clone()); + } + }); + + result + } + pub(super) fn default_compaction_config(&self) -> CompactionConfig { self.default_config.clone() } - pub(super) async fn update_compaction_config( + pub(super) fn update_compaction_config_trx( &mut self, compaction_group_ids: &[CompactionGroupId], config_to_update: &[MutableConfig], - ) -> Result> { + ) -> Result> { let mut compaction_groups = BTreeMapTransaction::new(&mut self.compaction_groups); - let mut result = Vec::with_capacity(compaction_group_ids.len()); for compaction_group_id in compaction_group_ids.iter().unique() { let group = compaction_groups.get(compaction_group_id).ok_or_else(|| { Error::CompactionGroup(format!("invalid group {}", *compaction_group_id)) @@ -804,14 +816,17 @@ impl CompactionGroupManager { let mut new_group = group.clone(); new_group.compaction_config = Arc::new(config); compaction_groups.insert(*compaction_group_id, new_group.clone()); - result.push(new_group); + // result.push(new_group); } - commit_multi_var!(self.meta_store_impl, compaction_groups)?; - Ok(result) + + Ok(compaction_groups) } /// Removes stale group configs. - async fn purge(&mut self, existing_groups: HashSet) -> Result<()> { + fn purge( + &mut self, + existing_groups: HashSet, + ) -> Option> { let mut compaction_groups = BTreeMapTransaction::new(&mut self.compaction_groups); let stale_group = compaction_groups .tree_ref() @@ -820,13 +835,13 @@ impl CompactionGroupManager { .filter(|k| !existing_groups.contains(k)) .collect_vec(); if stale_group.is_empty() { - return Ok(()); + return None; } for group in stale_group { compaction_groups.remove(group); } - commit_multi_var!(self.meta_store_impl, compaction_groups)?; - Ok(()) + + Some(compaction_groups) } } @@ -896,8 +911,11 @@ mod tests { use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig; use risingwave_pb::meta::table_fragments::Fragment; + use crate::hummock::commit_multi_var; + use crate::hummock::error::Result; use crate::hummock::manager::compaction_group_manager::CompactionGroupManager; use crate::hummock::test_utils::setup_compute_env; + use crate::manager::MetaStoreImpl; use crate::model::TableFragments; #[tokio::test] @@ -905,21 +923,46 @@ mod tests { let (env, ..) = setup_compute_env(8080).await; let mut inner = CompactionGroupManager::new(&env).await.unwrap(); assert_eq!(inner.compaction_groups.len(), 2); - inner - .update_compaction_config(&[100, 200], &[]) - .await - .unwrap_err(); - inner - .get_or_insert_compaction_group_configs(&[100, 200]) + + async fn update_compaction_config( + meta: &MetaStoreImpl, + inner: &mut CompactionGroupManager, + cg_ids: &[u64], + config_to_update: &[MutableConfig], + ) -> Result<()> { + let update_trx = inner + .update_compaction_config_trx(cg_ids, config_to_update) + .unwrap(); + + commit_multi_var!(meta, update_trx) + } + + async fn insert_compaction_group_configs( + meta: &MetaStoreImpl, + inner: &mut CompactionGroupManager, + cg_ids: &[u64], + ) { + if let Some(insert_trx) = inner.insert_not_exist_configs_trx(cg_ids) { + commit_multi_var!(meta, insert_trx).unwrap(); + } + } + + update_compaction_config(env.meta_store_ref(), &mut inner, &[100, 200], &[]) .await .unwrap(); + insert_compaction_group_configs(env.meta_store_ref(), &mut inner, &[100, 200]).await; assert_eq!(inner.compaction_groups.len(), 4); let mut inner = CompactionGroupManager::new(&env).await.unwrap(); assert_eq!(inner.compaction_groups.len(), 4); - inner - .update_compaction_config(&[100, 200], &[MutableConfig::MaxSubCompaction(123)]) - .await - .unwrap(); + + update_compaction_config( + env.meta_store_ref(), + &mut inner, + &[100, 200], + &[MutableConfig::MaxSubCompaction(123)], + ) + .await + .unwrap(); assert_eq!(inner.compaction_groups.len(), 4); assert_eq!( inner diff --git a/src/meta/src/hummock/manager/context.rs b/src/meta/src/hummock/manager/context.rs index 155dd362c090..982a94fd5f9d 100644 --- a/src/meta/src/hummock/manager/context.rs +++ b/src/meta/src/hummock/manager/context.rs @@ -99,7 +99,7 @@ impl HummockManager { ) -> Result<()> { let mut context_info = self.context_info.write().await; context_info - .release_contexts(context_ids, self.meta_store_ref()) + .release_contexts(context_ids, self.env.meta_store()) .await?; #[cfg(test)] { @@ -188,7 +188,7 @@ impl HummockManager { } context_info - .release_contexts(&invalid_context_ids, self.meta_store_ref()) + .release_contexts(&invalid_context_ids, self.env.meta_store()) .await?; Ok(invalid_context_ids) diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index b42a9d091071..35799efe0158 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -297,7 +297,7 @@ impl HummockManager { Ok(instance) } - fn meta_store_ref(&self) -> MetaStoreImpl { + fn meta_store_ref(&self) -> &MetaStoreImpl { self.env.meta_store_ref() } @@ -501,12 +501,11 @@ impl HummockManager { pairs.push((table_id as StateTableId, group.id)); } let group_config = group.compaction_config.clone().unwrap(); - self.compaction_group_manager - .write() - .await - .init_compaction_config_for_replay(group.id, group_config) - .await - .unwrap(); + let mut compaction_group_manager = self.compaction_group_manager.write().await; + let insert_trx = compaction_group_manager + .init_compaction_config_for_replay_trx(group.id, group_config); + commit_multi_var!(self.meta_store_ref(), insert_trx)?; + self.register_table_ids(&pairs).await?; tracing::info!("Registered table ids {:?}", pairs); } diff --git a/src/meta/src/hummock/manager/timer_task.rs b/src/meta/src/hummock/manager/timer_task.rs index e28a2d2e9fb0..c790b94a4406 100644 --- a/src/meta/src/hummock/manager/timer_task.rs +++ b/src/meta/src/hummock/manager/timer_task.rs @@ -205,8 +205,10 @@ impl HummockManager { let versioning_guard = hummock_manager.versioning.read().await; - let configs = - hummock_manager.get_compaction_group_map().await; + let configs: std::collections::BTreeMap< + u64, + crate::hummock::model::CompactionGroup, + > = hummock_manager.get_compaction_group_map().await; let versioning_deref = versioning_guard; ( versioning_deref.current_version.clone(), @@ -556,8 +558,13 @@ impl HummockManager { ) .await; match ret { - Ok((new_group_id, table_vnode_partition_count)) => { - tracing::info!("move state table [{}] from group-{} to group-{} success table_vnode_partition_count {:?}", table_id, parent_group_id, new_group_id, table_vnode_partition_count); + Ok(new_group_id) => { + tracing::info!( + "move state table [{}] from group-{} to group-{} success", + table_id, + parent_group_id, + new_group_id + ); } Err(e) => { tracing::info!( diff --git a/src/meta/src/manager/cluster.rs b/src/meta/src/manager/cluster.rs index 4dd4a257c5b4..876050c36ae6 100644 --- a/src/meta/src/manager/cluster.rs +++ b/src/meta/src/manager/cluster.rs @@ -568,7 +568,7 @@ impl ClusterManagerCore { pub const MAX_WORKER_REUSABLE_ID_COUNT: usize = 1 << Self::MAX_WORKER_REUSABLE_ID_BITS; async fn new(env: MetaSrvEnv) -> MetaResult { - let meta_store = env.meta_store().as_kv(); + let meta_store = env.meta_store_ref().as_kv(); let mut workers = Worker::list(meta_store).await?; let used_transactional_ids: HashSet<_> = workers diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index af7218219afc..f3d2e44f1fa0 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -462,11 +462,11 @@ impl MetaSrvEnv { Ok(env) } - pub fn meta_store_ref(&self) -> MetaStoreImpl { + pub fn meta_store(&self) -> MetaStoreImpl { self.meta_store_impl.clone() } - pub fn meta_store(&self) -> &MetaStoreImpl { + pub fn meta_store_ref(&self) -> &MetaStoreImpl { &self.meta_store_impl }