diff --git a/src/common/metrics/src/monitor/rwlock.rs b/src/common/metrics/src/monitor/rwlock.rs index 46f9d5edea91..4d65c5380110 100644 --- a/src/common/metrics/src/monitor/rwlock.rs +++ b/src/common/metrics/src/monitor/rwlock.rs @@ -16,31 +16,34 @@ use prometheus::HistogramVec; use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; pub struct MonitoredRwLock { + // labels: [lock_name, lock_type] metrics: HistogramVec, inner: RwLock, + lock_name: &'static str, } impl MonitoredRwLock { - pub fn new(metrics: HistogramVec, val: T) -> Self { + pub fn new(metrics: HistogramVec, val: T, lock_name: &'static str) -> Self { Self { metrics, inner: RwLock::new(val), + lock_name, } } - pub async fn read<'a, 'b>( - &'a self, - label_values: &'b [&'static str], - ) -> RwLockReadGuard<'a, T> { - let _timer = self.metrics.with_label_values(label_values).start_timer(); + pub async fn read(&self) -> RwLockReadGuard<'_, T> { + let _timer = self + .metrics + .with_label_values(&[self.lock_name, "read"]) + .start_timer(); self.inner.read().await } - pub async fn write<'a, 'b>( - &'a self, - label_values: &'b [&'static str], - ) -> RwLockWriteGuard<'a, T> { - let _timer = self.metrics.with_label_values(label_values).start_timer(); + pub async fn write(&self) -> RwLockWriteGuard<'_, T> { + let _timer = self + .metrics + .with_label_values(&[self.lock_name, "write"]) + .start_timer(); self.inner.write().await } } diff --git a/src/meta/src/hummock/manager/checkpoint.rs b/src/meta/src/hummock/manager/checkpoint.rs index d20df896f0a1..70bbef6bd3db 100644 --- a/src/meta/src/hummock/manager/checkpoint.rs +++ b/src/meta/src/hummock/manager/checkpoint.rs @@ -17,7 +17,6 @@ use std::ops::Bound::{Excluded, Included}; use std::ops::{Deref, DerefMut}; use std::sync::atomic::Ordering; -use function_name::named; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{ object_size_map, summarize_group_deltas, }; @@ -32,7 +31,6 @@ use tracing::warn; use crate::hummock::error::Result; use crate::hummock::manager::versioning::Versioning; -use crate::hummock::manager::{read_lock, write_lock}; use crate::hummock::metrics_utils::{trigger_gc_stat, trigger_split_stat}; use crate::hummock::HummockManager; @@ -122,11 +120,10 @@ impl HummockManager { /// Returns the diff between new and old checkpoint id. /// Note that this method must not be called concurrently, because internally it doesn't hold /// lock throughout the method. - #[named] pub async fn create_version_checkpoint(&self, min_delta_log_num: u64) -> Result { let timer = self.metrics.version_checkpoint_latency.start_timer(); // 1. hold read lock and create new checkpoint - let versioning_guard = read_lock!(self, versioning).await; + let versioning_guard = self.versioning.read().await; let versioning: &Versioning = versioning_guard.deref(); let current_version: &HummockVersion = &versioning.current_version; let old_checkpoint: &HummockVersionCheckpoint = &versioning.checkpoint; @@ -137,9 +134,10 @@ impl HummockManager { } if cfg!(test) && new_checkpoint_id == old_checkpoint_id { drop(versioning_guard); - let mut versioning = write_lock!(self, versioning).await; - versioning.mark_objects_for_deletion(); - let min_pinned_version_id = versioning.min_pinned_version_id(); + let versioning = self.versioning.read().await; + let context_info = self.context_info.read().await; + versioning.mark_objects_for_deletion(&context_info, &self.delete_object_tracker); + let min_pinned_version_id = context_info.min_pinned_version_id(); trigger_gc_stat(&self.metrics, &versioning.checkpoint, min_pinned_version_id); return Ok(0); } @@ -224,16 +222,17 @@ impl HummockManager { } } // 3. hold write lock and update in memory state - let mut versioning_guard = write_lock!(self, versioning).await; + let mut versioning_guard = self.versioning.write().await; let versioning = versioning_guard.deref_mut(); + let context_info = self.context_info.read().await; assert!(new_checkpoint.version.id > versioning.checkpoint.version.id); versioning.checkpoint = new_checkpoint; // Not delete stale objects when archive is enabled if !self.env.opts.enable_hummock_data_archive { - versioning.mark_objects_for_deletion(); + versioning.mark_objects_for_deletion(&context_info, &self.delete_object_tracker); } - let min_pinned_version_id = versioning.min_pinned_version_id(); + let min_pinned_version_id = context_info.min_pinned_version_id(); trigger_gc_stat(&self.metrics, &versioning.checkpoint, min_pinned_version_id); trigger_split_stat(&self.metrics, &versioning.current_version); drop(versioning_guard); @@ -260,9 +259,8 @@ impl HummockManager { self.pause_version_checkpoint.load(Ordering::Relaxed) } - #[named] pub async fn get_checkpoint_version(&self) -> HummockVersion { - let versioning_guard = read_lock!(self, versioning).await; + let versioning_guard = self.versioning.read().await; versioning_guard.checkpoint.version.clone() } } diff --git a/src/meta/src/hummock/manager/compaction.rs b/src/meta/src/hummock/manager/compaction.rs index ba9954607ab7..369799a8d7b0 100644 --- a/src/meta/src/hummock/manager/compaction.rs +++ b/src/meta/src/hummock/manager/compaction.rs @@ -15,7 +15,6 @@ use std::collections::{BTreeMap, HashMap, HashSet}; use std::sync::Arc; -use function_name::named; use futures::future::Shared; use itertools::Itertools; use risingwave_hummock_sdk::{CompactionGroupId, HummockCompactionTaskId}; @@ -37,7 +36,7 @@ use tokio::sync::oneshot::Receiver as OneShotReceiver; use crate::hummock::compaction::selector::level_selector::PickerInfo; use crate::hummock::compaction::selector::DynamicLevelSelectorCore; use crate::hummock::compaction::{CompactStatus, CompactionDeveloperConfig, CompactionSelector}; -use crate::hummock::manager::{init_selectors, read_lock}; +use crate::hummock::manager::init_selectors; use crate::hummock::HummockManager; const MAX_SKIP_TIMES: usize = 8; @@ -54,17 +53,12 @@ pub struct Compaction { } impl HummockManager { - #[named] pub async fn get_assigned_compact_task_num(&self) -> u64 { - read_lock!(self, compaction) - .await - .compact_task_assignment - .len() as u64 + self.compaction.read().await.compact_task_assignment.len() as u64 } - #[named] pub async fn list_all_tasks_ids(&self) -> Vec { - let compaction = read_lock!(self, compaction).await; + let compaction = self.compaction.read().await; compaction .compaction_statuses @@ -77,11 +71,10 @@ impl HummockManager { .collect_vec() } - #[named] pub async fn list_compaction_status( &self, ) -> (Vec, Vec) { - let compaction = read_lock!(self, compaction).await; + let compaction = self.compaction.read().await; ( compaction.compaction_statuses.values().map_into().collect(), compaction @@ -92,14 +85,13 @@ impl HummockManager { ) } - #[named] pub async fn get_compaction_scores( &self, compaction_group_id: CompactionGroupId, ) -> Vec { let (status, levels, group) = { - let compaction = read_lock!(self, compaction).await; - let versioning = read_lock!(self, versioning).await; + let compaction = self.compaction.read().await; + let versioning = self.versioning.read().await; let config_manager = self.compaction_group_manager.read().await; match ( compaction.compaction_statuses.get(&compaction_group_id), diff --git a/src/meta/src/hummock/manager/compaction_group_manager.rs b/src/meta/src/hummock/manager/compaction_group_manager.rs index 2a9b4eca28fe..c98700792a88 100644 --- a/src/meta/src/hummock/manager/compaction_group_manager.rs +++ b/src/meta/src/hummock/manager/compaction_group_manager.rs @@ -16,7 +16,6 @@ use std::collections::{BTreeMap, HashMap, HashSet}; use std::ops::DerefMut; use std::sync::Arc; -use function_name::named; use itertools::Itertools; use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{ @@ -31,19 +30,19 @@ use risingwave_pb::hummock::group_delta::DeltaType; use risingwave_pb::hummock::hummock_version_delta::GroupDeltas; use risingwave_pb::hummock::rise_ctl_update_compaction_config_request::mutable_config::MutableConfig; use risingwave_pb::hummock::subscribe_compaction_event_request::ReportTask; +use risingwave_pb::hummock::write_limits::WriteLimit; use risingwave_pb::hummock::{ compact_task, CompactionConfig, CompactionGroupInfo, CompatibilityVersion, GroupConstruct, GroupDelta, GroupDestroy, GroupMetaChange, }; use thiserror_ext::AsReport; -use tokio::sync::{OnceCell, RwLock}; +use tokio::sync::OnceCell; -use super::write_lock; use crate::hummock::compaction::compaction_config::{ validate_compaction_config, CompactionConfigBuilder, }; use crate::hummock::error::{Error, Result}; -use crate::hummock::manager::{commit_multi_var, create_trx_wrapper, read_lock, HummockManager}; +use crate::hummock::manager::{commit_multi_var, create_trx_wrapper, HummockManager}; use crate::hummock::metrics_utils::remove_compaction_group_in_sst_stat; use crate::hummock::model::CompactionGroup; use crate::hummock::sequence::{next_compaction_group_id, next_sstable_object_id}; @@ -57,7 +56,7 @@ use crate::storage::MetaStore; impl HummockManager { pub(super) async fn build_compaction_group_manager( env: &MetaSrvEnv, - ) -> Result> { + ) -> Result { let default_config = match env.opts.compaction_config.as_ref() { None => CompactionConfigBuilder::new().build(), Some(opt) => CompactionConfigBuilder::with_opt(opt).build(), @@ -68,21 +67,21 @@ impl HummockManager { pub(super) async fn build_compaction_group_manager_with_config( env: &MetaSrvEnv, default_config: CompactionConfig, - ) -> Result> { - let compaction_group_manager = RwLock::new(CompactionGroupManager { + ) -> Result { + let mut compaction_group_manager = CompactionGroupManager { compaction_groups: BTreeMap::new(), default_config, + write_limit: Default::default(), meta_store_impl: env.meta_store_ref(), - }); - compaction_group_manager.write().await.init().await?; + }; + compaction_group_manager.init().await?; Ok(compaction_group_manager) } /// Should not be called inside [`HummockManager`], because it requests locks internally. /// The implementation acquires `versioning` lock. - #[named] pub async fn compaction_group_ids(&self) -> Vec { - get_compaction_group_ids(&read_lock!(self, versioning).await.current_version).collect_vec() + get_compaction_group_ids(&self.versioning.read().await.current_version).collect_vec() } /// The implementation acquires `compaction_group_manager` lock. @@ -141,10 +140,9 @@ impl HummockManager { /// Unregisters stale members and groups /// The caller should ensure `table_fragments_list` remain unchanged during `purge`. /// Currently `purge` is only called during meta service start ups. - #[named] pub async fn purge(&self, valid_ids: &[u32]) -> Result<()> { let registered_members = - get_member_table_ids(&read_lock!(self, versioning).await.current_version); + get_member_table_ids(&self.versioning.read().await.current_version); let to_unregister = registered_members .into_iter() .filter(|table_id| !valid_ids.contains(table_id)) @@ -155,7 +153,6 @@ impl HummockManager { } /// The implementation acquires `versioning` lock. - #[named] pub async fn register_table_ids( &self, pairs: &[(StateTableId, CompactionGroupId)], @@ -163,7 +160,7 @@ impl HummockManager { if pairs.is_empty() { return Ok(()); } - let mut versioning_guard = write_lock!(self, versioning).await; + let mut versioning_guard = self.versioning.write().await; let versioning = versioning_guard.deref_mut(); let current_version = &versioning.current_version; @@ -250,12 +247,11 @@ impl HummockManager { Ok(()) } - #[named] pub async fn unregister_table_ids(&self, table_ids: &[StateTableId]) -> Result<()> { if table_ids.is_empty() { return Ok(()); } - let mut versioning_guard = write_lock!(self, versioning).await; + let mut versioning_guard = self.versioning.write().await; let versioning = versioning_guard.deref_mut(); let current_version = &versioning.current_version; let mut new_version_delta = create_trx_wrapper!( @@ -383,9 +379,8 @@ impl HummockManager { /// Gets complete compaction group info. /// It is the aggregate of `HummockVersion` and `CompactionGroupConfig` - #[named] pub async fn list_compaction_group(&self) -> Vec { - let mut versioning_guard = write_lock!(self, versioning).await; + let mut versioning_guard = self.versioning.write().await; let versioning = versioning_guard.deref_mut(); let current_version = &versioning.current_version; let mut compaction_groups = vec![]; @@ -431,8 +426,7 @@ impl HummockManager { /// move some table to another compaction-group. Create a new compaction group if it does not /// exist. - /// TODO: Move table_to_partition in result to compaction group - #[named] + /// TODO: Move `table_to_partition` in result to compaction group pub async fn move_state_table_to_compaction_group( &self, parent_group_id: CompactionGroupId, @@ -444,8 +438,8 @@ impl HummockManager { return Ok((parent_group_id, table_to_partition)); } let table_ids = table_ids.iter().cloned().unique().collect_vec(); - let compaction_guard = write_lock!(self, compaction).await; - let mut versioning_guard = write_lock!(self, versioning).await; + let compaction_guard = self.compaction.write().await; + let mut versioning_guard = self.versioning.write().await; let versioning = versioning_guard.deref_mut(); let current_version = &versioning.current_version; // Validate parameters. @@ -604,11 +598,10 @@ impl HummockManager { Ok((target_compaction_group_id, table_to_partition)) } - #[named] pub async fn calculate_compaction_group_statistic(&self) -> Vec { let mut infos = vec![]; { - let versioning_guard = read_lock!(self, versioning).await; + let versioning_guard = self.versioning.read().await; let version = &versioning_guard.current_version; for (group_id, group) in &version.levels { let mut group_info = TableGroupInfo { @@ -649,6 +642,8 @@ impl HummockManager { pub(super) struct CompactionGroupManager { compaction_groups: BTreeMap, default_config: CompactionConfig, + /// Tables that write limit is trigger for. + pub write_limit: HashMap, meta_store_impl: MetaStoreImpl, } @@ -871,38 +866,30 @@ mod tests { #[tokio::test] async fn test_inner() { let (env, ..) = setup_compute_env(8080).await; - let inner = HummockManager::build_compaction_group_manager(&env) + let mut inner = HummockManager::build_compaction_group_manager(&env) .await .unwrap(); - assert_eq!(inner.read().await.compaction_groups.len(), 2); + assert_eq!(inner.compaction_groups.len(), 2); inner - .write() - .await .update_compaction_config(&[100, 200], &[]) .await .unwrap_err(); inner - .write() - .await .get_or_insert_compaction_group_configs(&[100, 200]) .await .unwrap(); - assert_eq!(inner.read().await.compaction_groups.len(), 4); - let inner = HummockManager::build_compaction_group_manager(&env) + assert_eq!(inner.compaction_groups.len(), 4); + let mut inner = HummockManager::build_compaction_group_manager(&env) .await .unwrap(); - assert_eq!(inner.read().await.compaction_groups.len(), 4); + assert_eq!(inner.compaction_groups.len(), 4); inner - .write() - .await .update_compaction_config(&[100, 200], &[MutableConfig::MaxSubCompaction(123)]) .await .unwrap(); - assert_eq!(inner.read().await.compaction_groups.len(), 4); + assert_eq!(inner.compaction_groups.len(), 4); assert_eq!( inner - .read() - .await .try_get_compaction_group_config(100) .unwrap() .compaction_config @@ -911,8 +898,6 @@ mod tests { ); assert_eq!( inner - .read() - .await .try_get_compaction_group_config(200) .unwrap() .compaction_config diff --git a/src/meta/src/hummock/manager/context.rs b/src/meta/src/hummock/manager/context.rs index 21aeb8402838..a388f9d47213 100644 --- a/src/meta/src/hummock/manager/context.rs +++ b/src/meta/src/hummock/manager/context.rs @@ -12,35 +12,41 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashMap, HashSet}; -use std::ops::DerefMut; +use std::collections::{BTreeMap, HashMap, HashSet}; use fail::fail_point; -use function_name::named; use itertools::Itertools; use risingwave_hummock_sdk::version::HummockVersion; use risingwave_hummock_sdk::{ - ExtendedSstableInfo, HummockContextId, HummockEpoch, HummockSstableObjectId, + ExtendedSstableInfo, HummockContextId, HummockEpoch, HummockSstableObjectId, HummockVersionId, }; -use risingwave_pb::hummock::ValidationTask; +use risingwave_pb::hummock::{HummockPinnedSnapshot, HummockPinnedVersion, ValidationTask}; use crate::hummock::error::{Error, Result}; use crate::hummock::manager::{ - commit_multi_var, create_trx_wrapper, read_lock, start_measure_real_process_timer, write_lock, + commit_multi_var, create_trx_wrapper, start_measure_real_process_timer, }; use crate::hummock::HummockManager; -use crate::manager::META_NODE_ID; +use crate::manager::{MetaStoreImpl, MetadataManager, META_NODE_ID}; use crate::model::{BTreeMapTransaction, BTreeMapTransactionWrapper, ValTransaction}; use crate::storage::MetaStore; -impl HummockManager { +#[derive(Default)] +pub(super) struct ContextInfo { + pub pinned_versions: BTreeMap, + pub pinned_snapshots: BTreeMap, + /// `version_safe_points` is similar to `pinned_versions` expect for being a transient state. + pub version_safe_points: Vec, +} + +impl ContextInfo { /// Release resources pinned by these contexts, including: /// - Version /// - Snapshot - #[named] - pub async fn release_contexts( - &self, + async fn release_contexts( + &mut self, context_ids: impl AsRef<[HummockContextId]>, + meta_store_ref: MetaStoreImpl, ) -> Result<()> { fail_point!("release_contexts_metastore_err", |_| Err(Error::MetaStore( anyhow::anyhow!("failpoint metastore error") @@ -49,50 +55,77 @@ impl HummockManager { anyhow::anyhow!("failpoint internal error") ))); - let mut versioning_guard = write_lock!(self, versioning).await; - let versioning = versioning_guard.deref_mut(); let mut pinned_versions = create_trx_wrapper!( - self.meta_store_ref(), + meta_store_ref, BTreeMapTransactionWrapper, - BTreeMapTransaction::new(&mut versioning.pinned_versions,) + BTreeMapTransaction::new(&mut self.pinned_versions,) ); let mut pinned_snapshots = create_trx_wrapper!( - self.meta_store_ref(), + meta_store_ref, BTreeMapTransactionWrapper, - BTreeMapTransaction::new(&mut versioning.pinned_snapshots,) + BTreeMapTransaction::new(&mut self.pinned_snapshots,) ); for context_id in context_ids.as_ref() { pinned_versions.remove(*context_id); pinned_snapshots.remove(*context_id); } - commit_multi_var!(self.meta_store_ref(), pinned_versions, pinned_snapshots)?; + commit_multi_var!(meta_store_ref, pinned_versions, pinned_snapshots)?; + Ok(()) + } +} + +impl HummockManager { + pub async fn release_contexts( + &self, + context_ids: impl AsRef<[HummockContextId]>, + ) -> Result<()> { + let mut context_info = self.context_info.write().await; + context_info + .release_contexts(context_ids, self.meta_store_ref()) + .await?; #[cfg(test)] { - drop(versioning_guard); + drop(context_info); self.check_state_consistency().await; } - Ok(()) } /// Checks whether `context_id` is valid. pub async fn check_context(&self, context_id: HummockContextId) -> Result { - Ok(self - .metadata_manager() + self.context_info + .read() + .await + .check_context(context_id, &self.metadata_manager) + .await + } +} + +impl ContextInfo { + /// Checks whether `context_id` is valid. + /// + /// Need `&self` to sync with `release_context` + pub(super) async fn check_context( + &self, + context_id: HummockContextId, + metadata_manager: &MetadataManager, + ) -> Result { + Ok(metadata_manager .get_worker_by_id(context_id) .await .map_err(|err| Error::MetaStore(err.into()))? .is_some()) } +} +impl HummockManager { /// Release invalid contexts, aka worker node ids which are no longer valid in `ClusterManager`. - #[named] pub(super) async fn release_invalid_contexts(&self) -> Result> { - let active_context_ids = { - let compaction_guard = read_lock!(self, compaction).await; - let versioning_guard = read_lock!(self, versioning).await; - let _timer = start_measure_real_process_timer!(self); + let (active_context_ids, mut context_info) = { + let compaction_guard = self.compaction.read().await; + let context_info = self.context_info.write().await; + let _timer = start_measure_real_process_timer!(self, "release_invalid_contexts"); let mut active_context_ids = HashSet::new(); active_context_ids.extend( compaction_guard @@ -100,19 +133,24 @@ impl HummockManager { .values() .map(|c| c.context_id), ); - active_context_ids.extend(versioning_guard.pinned_versions.keys()); - active_context_ids.extend(versioning_guard.pinned_snapshots.keys()); - active_context_ids + active_context_ids.extend(context_info.pinned_versions.keys()); + active_context_ids.extend(context_info.pinned_snapshots.keys()); + (active_context_ids, context_info) }; let mut invalid_context_ids = vec![]; for active_context_id in &active_context_ids { - if !self.check_context(*active_context_id).await? { + if !context_info + .check_context(*active_context_id, &self.metadata_manager) + .await? + { invalid_context_ids.push(*active_context_id); } } - self.release_contexts(&invalid_context_ids).await?; + context_info + .release_contexts(&invalid_context_ids, self.meta_store_ref()) + .await?; Ok(invalid_context_ids) } @@ -133,7 +171,13 @@ impl HummockManager { continue; } } - if !self.check_context(*context_id).await? { + if !self + .context_info + .read() + .await + .check_context(*context_id, &self.metadata_manager) + .await? + { return Err(Error::InvalidSst(*sst_id)); } } diff --git a/src/meta/src/hummock/manager/gc.rs b/src/meta/src/hummock/manager/gc.rs index dafd3231afcc..54f5e7120388 100644 --- a/src/meta/src/hummock/manager/gc.rs +++ b/src/meta/src/hummock/manager/gc.rs @@ -19,9 +19,9 @@ use std::ops::DerefMut; use std::time::Duration; use anyhow::Context; -use function_name::named; use futures::{stream, StreamExt}; use itertools::Itertools; +use parking_lot::Mutex; use risingwave_hummock_sdk::HummockSstableObjectId; use risingwave_pb::common::worker_node::State::Running; use risingwave_pb::common::WorkerType; @@ -29,32 +29,54 @@ use risingwave_pb::hummock::subscribe_compaction_event_response::Event as Respon use risingwave_pb::hummock::FullScanTask; use crate::hummock::error::{Error, Result}; -use crate::hummock::manager::versioning::Versioning; -use crate::hummock::manager::{commit_multi_var, create_trx_wrapper, read_lock, write_lock}; +use crate::hummock::manager::{commit_multi_var, create_trx_wrapper}; use crate::hummock::HummockManager; use crate::manager::MetadataManager; use crate::model::{BTreeMapTransaction, BTreeMapTransactionWrapper, ValTransaction}; use crate::storage::MetaStore; +#[derive(Default)] +pub(super) struct DeleteObjectTracker { + /// Objects that waits to be deleted from object store. It comes from either compaction, or + /// full GC (listing object store). + objects_to_delete: Mutex>, +} + +impl DeleteObjectTracker { + pub(super) fn add(&self, objects: impl Iterator) { + self.objects_to_delete.lock().extend(objects) + } + + pub(super) fn current(&self) -> HashSet { + self.objects_to_delete.lock().clone() + } + + pub(super) fn clear(&self) { + self.objects_to_delete.lock().clear(); + } + + pub(super) fn ack<'a>(&self, objects: impl Iterator) { + let mut lock = self.objects_to_delete.lock(); + for object in objects { + lock.remove(object); + } + } +} + impl HummockManager { /// Gets SST objects that is safe to be deleted from object store. - #[named] - pub async fn get_objects_to_delete(&self) -> Vec { - read_lock!(self, versioning) - .await - .objects_to_delete + pub fn get_objects_to_delete(&self) -> Vec { + self.delete_object_tracker + .current() .iter() .cloned() .collect_vec() } /// Acknowledges SSTs have been deleted from object store. - #[named] pub async fn ack_deleted_objects(&self, object_ids: &[HummockSstableObjectId]) -> Result<()> { - let mut versioning_guard = write_lock!(self, versioning).await; - for object_id in object_ids { - versioning_guard.objects_to_delete.remove(object_id); - } + self.delete_object_tracker.ack(object_ids.iter()); + let mut versioning_guard = self.versioning.write().await; for stale_objects in versioning_guard.checkpoint.stale_objects.values_mut() { stale_objects.id.retain(|id| !object_ids.contains(id)); } @@ -69,10 +91,10 @@ impl HummockManager { /// Deletes at most `batch_size` deltas. /// /// Returns (number of deleted deltas, number of remain `deltas_to_delete`). - #[named] pub async fn delete_version_deltas(&self, batch_size: usize) -> Result<(usize, usize)> { - let mut versioning_guard = write_lock!(self, versioning).await; + let mut versioning_guard = self.versioning.write().await; let versioning = versioning_guard.deref_mut(); + let context_info = self.context_info.read().await; let deltas_to_delete = versioning .hummock_version_deltas .range(..=versioning.checkpoint.version.id) @@ -80,7 +102,7 @@ impl HummockManager { .collect_vec(); // If there is any safe point, skip this to ensure meta backup has required delta logs to // replay version. - if !versioning.version_safe_points.is_empty() { + if !context_info.version_safe_points.is_empty() { return Ok((0, deltas_to_delete.len())); } let mut hummock_version_deltas = create_trx_wrapper!( @@ -102,6 +124,7 @@ impl HummockManager { commit_multi_var!(self.meta_store_ref(), hummock_version_deltas)?; #[cfg(test)] { + drop(context_info); drop(versioning_guard); self.check_state_consistency().await; } @@ -110,16 +133,15 @@ impl HummockManager { /// Extends `objects_to_delete` according to object store full scan result. /// Caller should ensure `object_ids` doesn't include any SST objects belong to a on-going - /// version write. That's to say, these object_ids won't appear in either `commit_epoch` or + /// version write. That's to say, these `object_ids` won't appear in either `commit_epoch` or /// `report_compact_task`. - #[named] pub async fn extend_objects_to_delete_from_scan( &self, object_ids: &[HummockSstableObjectId], ) -> usize { let tracked_object_ids: HashSet = { - let versioning_guard = read_lock!(self, versioning).await; - let versioning: &Versioning = &versioning_guard; + let versioning = self.versioning.read().await; + let context_info = self.context_info.read().await; // object ids in checkpoint version let mut tracked_object_ids = versioning.checkpoint.version.get_object_ids(); @@ -131,7 +153,7 @@ impl HummockManager { tracked_object_ids.extend(delta.newly_added_object_ids()); } // add stale object ids before the checkpoint version - let min_pinned_version_id = versioning.min_pinned_version_id(); + let min_pinned_version_id = context_info.min_pinned_version_id(); tracked_object_ids.extend( versioning .checkpoint @@ -147,9 +169,8 @@ impl HummockManager { .iter() .filter(|object_id| !tracked_object_ids.contains(object_id)) .collect_vec(); - let mut versioning_guard = write_lock!(self, versioning).await; - versioning_guard.objects_to_delete.extend(to_delete.clone()); - drop(versioning_guard); + self.delete_object_tracker + .add(to_delete.iter().map(|id| **id)); to_delete.len() } diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 261b58ce0116..d3d91fa70c59 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::borrow::BorrowMut; use std::collections::{BTreeMap, HashMap, HashSet, VecDeque}; use std::ops::{Deref, DerefMut}; use std::sync::atomic::AtomicBool; @@ -23,7 +22,6 @@ use anyhow::Context; use arc_swap::ArcSwap; use bytes::Bytes; use fail::fail_point; -use function_name::named; use futures::future::Either; use futures::stream::{BoxStream, FuturesUnordered}; use futures::{FutureExt, StreamExt}; @@ -68,7 +66,6 @@ use rw_futures_util::{pending_on_none, select_all}; use thiserror_ext::AsReport; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio::sync::oneshot::Sender; -use tokio::sync::RwLockWriteGuard; use tokio::task::JoinHandle; use tokio_stream::wrappers::IntervalStream; use tonic::Streaming; @@ -129,13 +126,14 @@ pub struct HummockManager { pub env: MetaSrvEnv, metadata_manager: MetadataManager, - /// Lock order: compaction, versioning, `compaction_group_manager`. - /// - Lock compaction first, then versioning, and finally `compaction_group_manager`. + /// Lock order: `compaction`, `versioning`, `compaction_group_manager`, `context_info` + /// - Lock `compaction` first, then `versioning`, then `compaction_group_manager` and finally `context_info`. /// - This order should be strictly followed to prevent deadlock. compaction: MonitoredRwLock, versioning: MonitoredRwLock, /// `CompactionGroupManager` manages compaction configs for compaction groups. - compaction_group_manager: tokio::sync::RwLock, + compaction_group_manager: MonitoredRwLock, + context_info: MonitoredRwLock, latest_snapshot: Snapshot, pub metrics: Arc, @@ -143,6 +141,8 @@ pub struct HummockManager { pub compactor_manager: CompactorManagerRef, event_sender: HummockManagerEventSender, + delete_object_tracker: DeleteObjectTracker, + object_store: ObjectStoreRef, version_checkpoint_path: String, version_archive_dir: String, @@ -174,21 +174,6 @@ pub struct HummockManager { pub type HummockManagerRef = Arc; -/// Acquire read lock of the lock with `lock_name`. -/// The macro will use macro `function_name` to get the name of the function of method that calls -/// the lock, and therefore, anyone to call this macro should ensured that the caller method has the -/// macro #[named] -macro_rules! read_lock { - ($hummock_mgr:expr, $lock_name:ident) => { - async { - $hummock_mgr - .$lock_name - .read(&[function_name!(), stringify!($lock_name), "read"]) - .await - } - }; -} -pub(crate) use read_lock; use risingwave_hummock_sdk::compaction_group::{StateTableId, StaticCompactionGroupId}; use risingwave_hummock_sdk::table_stats::{ add_prost_table_stats_map, purge_prost_table_stats, PbTableStatsMap, @@ -198,28 +183,12 @@ use risingwave_pb::catalog::Table; use risingwave_pb::hummock::level_handler::RunningCompactTask; use risingwave_pb::meta::relation::RelationInfo; -/// Acquire write lock of the lock with `lock_name`. -/// The macro will use macro `function_name` to get the name of the function of method that calls -/// the lock, and therefore, anyone to call this macro should ensured that the caller method has the -/// macro #[named] -macro_rules! write_lock { - ($hummock_mgr:expr, $lock_name:ident) => { - async { - $hummock_mgr - .$lock_name - .write(&[function_name!(), stringify!($lock_name), "write"]) - .await - } - }; -} -pub(crate) use write_lock; - macro_rules! start_measure_real_process_timer { - ($hummock_mgr:expr) => { + ($hummock_mgr:expr, $func_name:literal) => { $hummock_mgr .metrics .hummock_manager_real_process_time - .with_label_values(&[function_name!()]) + .with_label_values(&[$func_name]) .start_timer() }; } @@ -359,7 +328,7 @@ impl HummockManager { metadata_manager: MetadataManager, metrics: Arc, compactor_manager: CompactorManagerRef, - compaction_group_manager: tokio::sync::RwLock, + compaction_group_manager: CompactionGroupManager, compactor_streams_change_tx: UnboundedSender<( u32, Streaming, @@ -415,14 +384,25 @@ impl HummockManager { versioning: MonitoredRwLock::new( metrics.hummock_manager_lock_time.clone(), Default::default(), + "hummock_manager::versioning", ), compaction: MonitoredRwLock::new( metrics.hummock_manager_lock_time.clone(), Default::default(), + "hummock_manager::compaction", + ), + compaction_group_manager: MonitoredRwLock::new( + metrics.hummock_manager_lock_time.clone(), + compaction_group_manager, + "hummock_manager::compaction_group_manager", + ), + context_info: MonitoredRwLock::new( + metrics.hummock_manager_lock_time.clone(), + Default::default(), + "hummock_manager::context_info", ), metrics, metadata_manager, - compaction_group_manager, // compaction_request_channel: parking_lot::RwLock::new(None), compactor_manager, latest_snapshot: ArcSwap::from_pointee(HummockSnapshot { @@ -430,6 +410,7 @@ impl HummockManager { current_epoch: INVALID_EPOCH, }), event_sender: tx, + delete_object_tracker: Default::default(), object_store, version_checkpoint_path, version_archive_dir, @@ -453,13 +434,14 @@ impl HummockManager { } /// Load state from meta store. - #[named] async fn load_meta_store_state(&self) -> Result<()> { - let mut compaction_guard = write_lock!(self, compaction).await; - let mut versioning_guard = write_lock!(self, versioning).await; + let mut compaction_guard = self.compaction.write().await; + let mut versioning_guard = self.versioning.write().await; + let mut context_info_guard = self.context_info.write().await; self.load_meta_store_state_impl( - compaction_guard.borrow_mut(), - versioning_guard.borrow_mut(), + &mut compaction_guard, + &mut versioning_guard, + &mut context_info_guard, ) .await } @@ -467,8 +449,9 @@ impl HummockManager { /// Load state from meta store. async fn load_meta_store_state_impl( &self, - compaction_guard: &mut RwLockWriteGuard<'_, Compaction>, - versioning_guard: &mut RwLockWriteGuard<'_, Versioning>, + compaction_guard: &mut Compaction, + versioning_guard: &mut Versioning, + context_info: &mut ContextInfo, ) -> Result<()> { use sea_orm::EntityTrait; let meta_store = self.meta_store_ref(); @@ -582,7 +565,7 @@ impl HummockManager { versioning_guard.current_version = redo_state; versioning_guard.hummock_version_deltas = hummock_version_deltas; - versioning_guard.pinned_versions = match &meta_store { + context_info.pinned_versions = match &meta_store { MetaStoreImpl::Kv(meta_store) => HummockPinnedVersion::list(meta_store) .await? .into_iter() @@ -597,7 +580,7 @@ impl HummockManager { .collect(), }; - versioning_guard.pinned_snapshots = match &meta_store { + context_info.pinned_snapshots = match &meta_store { MetaStoreImpl::Kv(meta_store) => HummockPinnedSnapshot::list(meta_store) .await? .into_iter() @@ -612,23 +595,32 @@ impl HummockManager { .collect(), }; - versioning_guard.objects_to_delete.clear(); + self.delete_object_tracker.clear(); // Not delete stale objects when archive is enabled if !self.env.opts.enable_hummock_data_archive { - versioning_guard.mark_objects_for_deletion(); + versioning_guard.mark_objects_for_deletion(context_info, &self.delete_object_tracker); } - self.initial_compaction_group_config_after_load(versioning_guard) - .await?; + self.initial_compaction_group_config_after_load( + versioning_guard, + self.compaction_group_manager.write().await.deref_mut(), + ) + .await?; Ok(()) } - /// Caller should hold `versioning` lock, to sync with `HummockManager::release_contexts`. - async fn check_context_with_meta_node(&self, context_id: HummockContextId) -> Result<()> { + async fn check_context_with_meta_node( + &self, + context_id: HummockContextId, + context_info: &ContextInfo, + ) -> Result<()> { if context_id == META_NODE_ID { // Using the preserved meta id is allowed. - } else if !self.check_context(context_id).await? { + } else if !context_info + .check_context(context_id, &self.metadata_manager) + .await? + { // The worker is not found in cluster. return Err(Error::InvalidContext(context_id)); } @@ -637,16 +629,16 @@ impl HummockManager { /// Pin the current greatest hummock version. The pin belongs to `context_id` /// and will be unpinned when `context_id` is invalidated. - #[named] pub async fn pin_version(&self, context_id: HummockContextId) -> Result { - let mut versioning_guard = write_lock!(self, versioning).await; - self.check_context_with_meta_node(context_id).await?; - let _timer = start_measure_real_process_timer!(self); - let versioning = versioning_guard.deref_mut(); + let versioning = self.versioning.read().await; + let mut context_info = self.context_info.write().await; + self.check_context_with_meta_node(context_id, &context_info) + .await?; + let _timer = start_measure_real_process_timer!(self, "pin_version"); let mut pinned_versions = create_trx_wrapper!( self.meta_store_ref(), BTreeMapTransactionWrapper, - BTreeMapTransaction::new(&mut versioning.pinned_versions,) + BTreeMapTransaction::new(&mut context_info.pinned_versions) ); let mut context_pinned_version = pinned_versions.new_entry_txn_or_default( context_id, @@ -662,12 +654,13 @@ impl HummockManager { { context_pinned_version.min_pinned_id = version_id; commit_multi_var!(self.meta_store_ref(), context_pinned_version)?; - trigger_pin_unpin_version_state(&self.metrics, &versioning.pinned_versions); + trigger_pin_unpin_version_state(&self.metrics, &context_info.pinned_versions); } #[cfg(test)] { - drop(versioning_guard); + drop(context_info); + drop(versioning); self.check_state_consistency().await; } @@ -677,20 +670,19 @@ impl HummockManager { /// Unpin all pins which belongs to `context_id` and has an id which is older than /// `unpin_before`. All versions >= `unpin_before` will be treated as if they are all pinned by /// this `context_id` so they will not be vacuumed. - #[named] pub async fn unpin_version_before( &self, context_id: HummockContextId, unpin_before: HummockVersionId, ) -> Result<()> { - let mut versioning_guard = write_lock!(self, versioning).await; - self.check_context_with_meta_node(context_id).await?; - let _timer = start_measure_real_process_timer!(self); - let versioning = versioning_guard.deref_mut(); + let mut context_info = self.context_info.write().await; + self.check_context_with_meta_node(context_id, &context_info) + .await?; + let _timer = start_measure_real_process_timer!(self, "unpin_version_before"); let mut pinned_versions = create_trx_wrapper!( self.meta_store_ref(), BTreeMapTransactionWrapper, - BTreeMapTransaction::new(&mut versioning.pinned_versions,) + BTreeMapTransaction::new(&mut context_info.pinned_versions,) ); let mut context_pinned_version = pinned_versions.new_entry_txn_or_default( context_id, @@ -707,26 +699,26 @@ impl HummockManager { ); context_pinned_version.min_pinned_id = unpin_before; commit_multi_var!(self.meta_store_ref(), context_pinned_version)?; - trigger_pin_unpin_version_state(&self.metrics, &versioning.pinned_versions); + trigger_pin_unpin_version_state(&self.metrics, &context_info.pinned_versions); #[cfg(test)] { - drop(versioning_guard); + drop(context_info); self.check_state_consistency().await; } Ok(()) } - #[named] pub async fn pin_specific_snapshot( &self, context_id: HummockContextId, epoch: HummockEpoch, ) -> Result { let snapshot = self.latest_snapshot.load(); - let mut guard = write_lock!(self, versioning).await; - self.check_context_with_meta_node(context_id).await?; + let mut guard = self.context_info.write().await; + self.check_context_with_meta_node(context_id, &guard) + .await?; let mut pinned_snapshots = create_trx_wrapper!( self.meta_store_ref(), BTreeMapTransactionWrapper, @@ -748,12 +740,12 @@ impl HummockManager { } /// Make sure `max_committed_epoch` is pinned and return it. - #[named] pub async fn pin_snapshot(&self, context_id: HummockContextId) -> Result { let snapshot = self.latest_snapshot.load(); - let mut guard = write_lock!(self, versioning).await; - self.check_context_with_meta_node(context_id).await?; - let _timer = start_measure_real_process_timer!(self); + let mut guard = self.context_info.write().await; + self.check_context_with_meta_node(context_id, &guard) + .await?; + let _timer = start_measure_real_process_timer!(self, "pin_snapshot"); let mut pinned_snapshots = create_trx_wrapper!( self.meta_store_ref(), BTreeMapTransactionWrapper, @@ -779,25 +771,25 @@ impl HummockManager { HummockSnapshot::clone(&snapshot) } - #[named] pub async fn unpin_snapshot(&self, context_id: HummockContextId) -> Result<()> { - let mut versioning_guard = write_lock!(self, versioning).await; - self.check_context_with_meta_node(context_id).await?; - let _timer = start_measure_real_process_timer!(self); + let mut context_info = self.context_info.write().await; + self.check_context_with_meta_node(context_id, &context_info) + .await?; + let _timer = start_measure_real_process_timer!(self, "unpin_snapshot"); let mut pinned_snapshots = create_trx_wrapper!( self.meta_store_ref(), BTreeMapTransactionWrapper, - BTreeMapTransaction::new(&mut versioning_guard.pinned_snapshots,) + BTreeMapTransaction::new(&mut context_info.pinned_snapshots,) ); let release_snapshot = pinned_snapshots.remove(context_id); if release_snapshot.is_some() { commit_multi_var!(self.meta_store_ref(), pinned_snapshots)?; - trigger_pin_unpin_snapshot_state(&self.metrics, &versioning_guard.pinned_snapshots); + trigger_pin_unpin_snapshot_state(&self.metrics, &context_info.pinned_snapshots); } #[cfg(test)] { - drop(versioning_guard); + drop(context_info); self.check_state_consistency().await; } @@ -805,18 +797,19 @@ impl HummockManager { } /// Unpin all snapshots smaller than specified epoch for current context. - #[named] pub async fn unpin_snapshot_before( &self, context_id: HummockContextId, hummock_snapshot: HummockSnapshot, ) -> Result<()> { - let mut versioning_guard = write_lock!(self, versioning).await; - self.check_context_with_meta_node(context_id).await?; - let _timer = start_measure_real_process_timer!(self); + let versioning = self.versioning.read().await; + let mut context_info = self.context_info.write().await; + self.check_context_with_meta_node(context_id, &context_info) + .await?; + let _timer = start_measure_real_process_timer!(self, "unpin_snapshot_before"); // Use the max_committed_epoch in storage as the snapshot ts so only committed changes are // visible in the snapshot. - let max_committed_epoch = versioning_guard.current_version.max_committed_epoch; + let max_committed_epoch = versioning.current_version.max_committed_epoch; // Ensure the unpin will not clean the latest one. let snapshot_committed_epoch = hummock_snapshot.committed_epoch; #[cfg(not(test))] @@ -828,7 +821,7 @@ impl HummockManager { let mut pinned_snapshots = create_trx_wrapper!( self.meta_store_ref(), BTreeMapTransactionWrapper, - BTreeMapTransaction::new(&mut versioning_guard.pinned_snapshots,) + BTreeMapTransaction::new(&mut context_info.pinned_snapshots,) ); let mut context_pinned_snapshot = pinned_snapshots.new_entry_txn_or_default( context_id, @@ -845,19 +838,19 @@ impl HummockManager { { context_pinned_snapshot.minimal_pinned_snapshot = last_read_epoch; commit_multi_var!(self.meta_store_ref(), context_pinned_snapshot)?; - trigger_pin_unpin_snapshot_state(&self.metrics, &versioning_guard.pinned_snapshots); + trigger_pin_unpin_snapshot_state(&self.metrics, &context_info.pinned_snapshots); } #[cfg(test)] { - drop(versioning_guard); + drop(context_info); + drop(versioning); self.check_state_consistency().await; } Ok(()) } - #[named] pub async fn get_compact_tasks_impl( &self, compaction_groups: Vec, @@ -874,17 +867,20 @@ impl HummockManager { .await .map_err(|err| Error::MetaStore(err.into()))?; - let mut compaction_guard = write_lock!(self, compaction).await; + let mut compaction_guard = self.compaction.write().await; let compaction = compaction_guard.deref_mut(); - let mut versioning_guard = write_lock!(self, versioning).await; + let mut versioning_guard = self.versioning.write().await; let versioning = versioning_guard.deref_mut(); - let _timer = start_measure_real_process_timer!(self); + let _timer = start_measure_real_process_timer!(self, "get_compact_tasks_impl"); let mut current_version = versioning.current_version.clone(); let start_time = Instant::now(); let max_committed_epoch = current_version.max_committed_epoch; - let watermark = versioning + let watermark = self + .context_info + .read() + .await .pinned_snapshots .values() .map(|v| v.minimal_pinned_snapshot) @@ -1407,9 +1403,8 @@ impl HummockManager { /// /// Return Ok(false) indicates either the task is not found, /// or the task is not owned by `context_id` when `context_id` is not None. - #[named] pub async fn report_compact_tasks(&self, report_tasks: Vec) -> Result> { - let mut guard = write_lock!(self, compaction).await; + let mut guard = self.compaction.write().await; let deterministic_mode = self.env.opts.compaction_deterministic_test; let compaction = guard.deref_mut(); let start_time = Instant::now(); @@ -1426,9 +1421,9 @@ impl HummockManager { BTreeMapTransaction::new(&mut compaction.compact_task_assignment,) ); // The compaction task is finished. - let mut versioning_guard = write_lock!(self, versioning).await; + let mut versioning_guard = self.versioning.write().await; let versioning = versioning_guard.deref_mut(); - let _timer = start_measure_real_process_timer!(self); + let _timer = start_measure_real_process_timer!(self, "report_compact_tasks"); let mut current_version = versioning.current_version.clone(); // purge stale compact_status @@ -1625,7 +1620,6 @@ impl HummockManager { } /// Caller should ensure `epoch` > `max_committed_epoch` - #[named] pub async fn commit_epoch( &self, epoch: HummockEpoch, @@ -1638,8 +1632,8 @@ impl HummockManager { new_table_fragment_info, change_log_delta, } = commit_info; - let mut versioning_guard = write_lock!(self, versioning).await; - let _timer = start_measure_real_process_timer!(self); + let mut versioning_guard = self.versioning.write().await; + let _timer = start_measure_real_process_timer!(self, "commit_epoch"); // Prevent commit new epochs if this flag is set if versioning_guard.disable_commit_epochs { return Ok(None); @@ -1980,87 +1974,94 @@ impl HummockManager { Ok(SstObjectIdRange::new(start_id, start_id + number as u64)) } - #[named] pub async fn get_min_pinned_version_id(&self) -> HummockVersionId { - read_lock!(self, versioning).await.min_pinned_version_id() + self.context_info.read().await.min_pinned_version_id() } - #[named] #[cfg(test)] pub async fn check_state_consistency(&self) { - let mut compaction_guard = write_lock!(self, compaction).await; - let mut versioning_guard = write_lock!(self, versioning).await; - let objects_to_delete = versioning_guard.objects_to_delete.clone(); + let mut compaction_guard = self.compaction.write().await; + let mut versioning_guard = self.versioning.write().await; + let mut context_info_guard = self.context_info.write().await; + let objects_to_delete = self.delete_object_tracker.current(); // We don't check `checkpoint` because it's allowed to update its in memory state without // persisting to object store. - let get_state = - |compaction_guard: &RwLockWriteGuard<'_, Compaction>, - versioning_guard: &RwLockWriteGuard<'_, Versioning>| { - let compact_statuses_copy = compaction_guard.compaction_statuses.clone(); - let compact_task_assignment_copy = compaction_guard.compact_task_assignment.clone(); - let pinned_versions_copy = versioning_guard.pinned_versions.clone(); - let pinned_snapshots_copy = versioning_guard.pinned_snapshots.clone(); - let hummock_version_deltas_copy = versioning_guard.hummock_version_deltas.clone(); - let version_stats_copy = versioning_guard.version_stats.clone(); - (( - compact_statuses_copy, - compact_task_assignment_copy, - pinned_versions_copy, - pinned_snapshots_copy, - hummock_version_deltas_copy, - version_stats_copy, - ),) - }; - let mem_state = get_state(&compaction_guard, &versioning_guard); + let get_state = |compaction_guard: &mut Compaction, + versioning_guard: &mut Versioning, + context_info_guard: &mut ContextInfo| { + let compact_statuses_copy = compaction_guard.compaction_statuses.clone(); + let compact_task_assignment_copy = compaction_guard.compact_task_assignment.clone(); + let pinned_versions_copy = context_info_guard.pinned_versions.clone(); + let pinned_snapshots_copy = context_info_guard.pinned_snapshots.clone(); + let hummock_version_deltas_copy = versioning_guard.hummock_version_deltas.clone(); + let version_stats_copy = versioning_guard.version_stats.clone(); + (( + compact_statuses_copy, + compact_task_assignment_copy, + pinned_versions_copy, + pinned_snapshots_copy, + hummock_version_deltas_copy, + version_stats_copy, + ),) + }; + let mem_state = get_state( + &mut compaction_guard, + &mut versioning_guard, + &mut context_info_guard, + ); self.load_meta_store_state_impl( - compaction_guard.borrow_mut(), - versioning_guard.borrow_mut(), + &mut compaction_guard, + &mut versioning_guard, + &mut context_info_guard, ) .await .expect("Failed to load state from meta store"); - let loaded_state = get_state(&compaction_guard, &versioning_guard); + let loaded_state = get_state( + &mut compaction_guard, + &mut versioning_guard, + &mut context_info_guard, + ); assert_eq!( mem_state, loaded_state, "hummock in-mem state is inconsistent with meta store state", ); - versioning_guard.objects_to_delete = objects_to_delete; + self.delete_object_tracker.clear(); + self.delete_object_tracker + .add(objects_to_delete.into_iter()); } /// Gets current version without pinning it. /// Should not be called inside [`HummockManager`], because it requests locks internally. /// /// Note: this method can hurt performance because it will clone a large object. - #[named] pub async fn get_current_version(&self) -> HummockVersion { - read_lock!(self, versioning).await.current_version.clone() + self.versioning.read().await.current_version.clone() } - #[named] pub async fn get_current_max_committed_epoch(&self) -> HummockEpoch { - read_lock!(self, versioning) + self.versioning + .read() .await .current_version .max_committed_epoch } - #[named] /// Gets the mapping from table id to compaction group id pub async fn get_table_compaction_group_id_mapping( &self, ) -> HashMap { - get_table_compaction_group_id_mapping(&read_lock!(self, versioning).await.current_version) + get_table_compaction_group_id_mapping(&self.versioning.read().await.current_version) } /// Get version deltas from meta store #[cfg_attr(coverage, coverage(off))] - #[named] pub async fn list_version_deltas( &self, start_id: u64, num_limit: u32, committed_epoch_limit: HummockEpoch, ) -> Result> { - let versioning = read_lock!(self, versioning).await; + let versioning = self.versioning.read().await; let version_deltas = versioning .hummock_version_deltas .range(start_id..) @@ -2127,12 +2128,11 @@ impl HummockManager { /// Replay a version delta to current hummock version. /// Returns the `version_id`, `max_committed_epoch` of the new version and the modified /// compaction groups - #[named] pub async fn replay_version_delta( &self, mut version_delta: HummockVersionDelta, ) -> Result<(HummockVersion, Vec)> { - let mut versioning_guard = write_lock!(self, versioning).await; + let mut versioning_guard = self.versioning.write().await; // ensure the version id is ascending after replay version_delta.id = versioning_guard.current_version.id + 1; version_delta.prev_id = version_delta.id - 1; @@ -2145,9 +2145,8 @@ impl HummockManager { Ok((version_new, compaction_group_ids)) } - #[named] pub async fn disable_commit_epoch(&self) -> HummockVersion { - let mut versioning_guard = write_lock!(self, versioning).await; + let mut versioning_guard = self.versioning.write().await; versioning_guard.disable_commit_epochs = true; versioning_guard.current_version.clone() } @@ -2271,18 +2270,16 @@ impl HummockManager { } #[cfg(any(test, feature = "test"))] - #[named] pub async fn compaction_task_from_assignment_for_test( &self, task_id: u64, ) -> Option { - let compaction_guard = read_lock!(self, compaction).await; + let compaction_guard = self.compaction.read().await; let assignment_ref = &compaction_guard.compact_task_assignment; assignment_ref.get(&task_id).cloned() } #[cfg(any(test, feature = "test"))] - #[named] pub async fn report_compact_task_for_test( &self, task_id: u64, @@ -2292,7 +2289,7 @@ impl HummockManager { table_stats_change: Option, ) -> Result<()> { if let Some(task) = compact_task { - let mut guard = write_lock!(self, compaction).await; + let mut guard = self.compaction.write().await; guard.compact_task_assignment.insert( task_id, CompactTaskAssignment { @@ -2357,10 +2354,7 @@ impl HummockManager { .notify_frontend_without_version(Operation::Update, Info::HummockStats(stats.clone())); } - #[named] pub fn hummock_timer_task(hummock_manager: Arc) -> (JoinHandle<()>, Sender<()>) { - use futures::{FutureExt, StreamExt}; - let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel(); let join_handle = tokio::spawn(async move { const CHECK_PENDING_TASK_PERIOD_SEC: u64 = 300; @@ -2525,7 +2519,7 @@ impl HummockManager { HummockTimerEvent::Report => { let (current_version, id_to_config, version_stats) = { let versioning_guard = - read_lock!(hummock_manager.as_ref(), versioning).await; + hummock_manager.versioning.read().await; let configs = hummock_manager.get_compaction_group_map().await; @@ -2669,12 +2663,11 @@ impl HummockManager { (join_handle, shutdown_tx) } - #[named] pub async fn check_dead_task(&self) { const MAX_COMPACTION_L0_MULTIPLIER: u64 = 32; const MAX_COMPACTION_DURATION_SEC: u64 = 20 * 60; let (groups, configs) = { - let versioning_guard = read_lock!(self, versioning).await; + let versioning_guard = self.versioning.read().await; let g = versioning_guard .current_version .levels @@ -2713,7 +2706,7 @@ impl HummockManager { } let mut pending_tasks: HashMap = HashMap::default(); { - let compaction_guard = read_lock!(self, compaction).await; + let compaction_guard = self.compaction.read().await; for group_id in slowdown_groups.keys() { if let Some(status) = compaction_guard.compaction_statuses.get(group_id) { for (idx, level_handler) in status.level_handlers.iter().enumerate() { @@ -3192,15 +3185,13 @@ impl HummockManager { async fn initial_compaction_group_config_after_load( &self, - versioning_guard: &mut RwLockWriteGuard<'_, Versioning>, + versioning_guard: &Versioning, + compaction_group_manager: &mut CompactionGroupManager, ) -> Result<()> { // 1. Due to version compatibility, we fix some of the configuration of older versions after hummock starts. let current_version = &versioning_guard.current_version; let all_group_ids = get_compaction_group_ids(current_version); - let mut configs = self - .compaction_group_manager - .write() - .await + let mut configs = compaction_group_manager .get_or_insert_compaction_group_configs(&all_group_ids.collect_vec()) .await?; @@ -3240,10 +3231,7 @@ impl HummockManager { tracing::info!("Compaction group {:?} configs rewrite ", rewrite_cg_ids); // update meta store - let result = self - .compaction_group_manager - .write() - .await + let result = compaction_group_manager .update_compaction_config( &rewrite_cg_ids, &[ @@ -3260,10 +3248,13 @@ impl HummockManager { } } - versioning_guard.write_limit = + compaction_group_manager.write_limit = calc_new_write_limits(configs, HashMap::new(), &versioning_guard.current_version); - trigger_write_stop_stats(&self.metrics, &versioning_guard.write_limit); - tracing::debug!("Hummock stopped write: {:#?}", versioning_guard.write_limit); + trigger_write_stop_stats(&self.metrics, &compaction_group_manager.write_limit); + tracing::debug!( + "Hummock stopped write: {:#?}", + compaction_group_manager.write_limit + ); { // 2. Restore the memory data structure according to the memory of the compaction group config. @@ -3276,14 +3267,13 @@ impl HummockManager { Ok(()) } - #[named] pub async fn list_change_log_epochs( &self, table_id: u32, min_epoch: u64, max_count: u32, ) -> Vec { - let versioning = read_lock!(self, versioning).await; + let versioning = self.versioning.read().await; if let Some(table_change_log) = versioning .current_version .table_change_log @@ -3443,6 +3433,8 @@ use tokio::sync::mpsc::error::SendError; use super::compaction::CompactionSelector; use crate::hummock::manager::checkpoint::HummockVersionCheckpoint; +use crate::hummock::manager::context::ContextInfo; +use crate::hummock::manager::gc::DeleteObjectTracker; use crate::hummock::sequence::next_sstable_object_id; #[derive(Debug, Default)] diff --git a/src/meta/src/hummock/manager/tests.rs b/src/meta/src/hummock/manager/tests.rs index 3af055574e18..160f9518f648 100644 --- a/src/meta/src/hummock/manager/tests.rs +++ b/src/meta/src/hummock/manager/tests.rs @@ -588,7 +588,7 @@ async fn test_hummock_manager_basic() { ); } // objects_to_delete is always empty because no compaction is ever invoked. - assert!(hummock_manager.get_objects_to_delete().await.is_empty()); + assert!(hummock_manager.get_objects_to_delete().is_empty()); assert_eq!( hummock_manager .delete_version_deltas(usize::MAX) @@ -600,7 +600,7 @@ async fn test_hummock_manager_basic() { hummock_manager.create_version_checkpoint(1).await.unwrap(), commit_log_count + register_log_count ); - assert!(hummock_manager.get_objects_to_delete().await.is_empty()); + assert!(hummock_manager.get_objects_to_delete().is_empty()); assert_eq!( hummock_manager .delete_version_deltas(usize::MAX) @@ -1125,7 +1125,7 @@ async fn test_extend_objects_to_delete() { .map(|s| s.get_object_id()) .chain(max_committed_object_id + 1..=max_committed_object_id + orphan_sst_num) .collect_vec(); - assert!(hummock_manager.get_objects_to_delete().await.is_empty()); + assert!(hummock_manager.get_objects_to_delete().is_empty()); assert_eq!( hummock_manager .extend_objects_to_delete_from_scan(&all_object_ids) @@ -1133,7 +1133,7 @@ async fn test_extend_objects_to_delete() { orphan_sst_num as usize ); assert_eq!( - hummock_manager.get_objects_to_delete().await.len(), + hummock_manager.get_objects_to_delete().len(), orphan_sst_num as usize ); @@ -1143,7 +1143,7 @@ async fn test_extend_objects_to_delete() { 6 ); assert_eq!( - hummock_manager.get_objects_to_delete().await.len(), + hummock_manager.get_objects_to_delete().len(), orphan_sst_num as usize ); // since version1 is still pinned, the sst removed in compaction can not be reclaimed. @@ -1153,10 +1153,10 @@ async fn test_extend_objects_to_delete() { .await, orphan_sst_num as usize ); - let objects_to_delete = hummock_manager.get_objects_to_delete().await; + let objects_to_delete = hummock_manager.get_objects_to_delete(); assert_eq!(objects_to_delete.len(), orphan_sst_num as usize); let pinned_version2: HummockVersion = hummock_manager.pin_version(context_id).await.unwrap(); - let objects_to_delete = hummock_manager.get_objects_to_delete().await; + let objects_to_delete = hummock_manager.get_objects_to_delete(); assert_eq!( objects_to_delete.len(), orphan_sst_num as usize, @@ -1167,7 +1167,7 @@ async fn test_extend_objects_to_delete() { .unpin_version_before(context_id, pinned_version2.id) .await .unwrap(); - let objects_to_delete = hummock_manager.get_objects_to_delete().await; + let objects_to_delete = hummock_manager.get_objects_to_delete(); assert_eq!( objects_to_delete.len(), orphan_sst_num as usize, @@ -1182,7 +1182,7 @@ async fn test_extend_objects_to_delete() { .await, orphan_sst_num as usize ); - let objects_to_delete = hummock_manager.get_objects_to_delete().await; + let objects_to_delete = hummock_manager.get_objects_to_delete(); assert_eq!(objects_to_delete.len(), orphan_sst_num as usize); let new_epoch = pinned_version2.max_committed_epoch.next_epoch(); hummock_manager @@ -1206,7 +1206,7 @@ async fn test_extend_objects_to_delete() { .await, orphan_sst_num as usize + 3 ); - let objects_to_delete = hummock_manager.get_objects_to_delete().await; + let objects_to_delete = hummock_manager.get_objects_to_delete(); assert_eq!(objects_to_delete.len(), orphan_sst_num as usize + 3); } diff --git a/src/meta/src/hummock/manager/versioning.rs b/src/meta/src/hummock/manager/versioning.rs index 7b99c77fd881..464404b8f25c 100644 --- a/src/meta/src/hummock/manager/versioning.rs +++ b/src/meta/src/hummock/manager/versioning.rs @@ -15,7 +15,6 @@ use std::cmp; use std::collections::{BTreeMap, HashMap, HashSet}; -use function_name::named; use itertools::Itertools; use risingwave_common::util::epoch::INVALID_EPOCH; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{ @@ -38,8 +37,10 @@ use risingwave_pb::meta::subscribe_response::{Info, Operation}; use super::check_cg_write_limit; use crate::hummock::error::Result; use crate::hummock::manager::checkpoint::HummockVersionCheckpoint; +use crate::hummock::manager::context::ContextInfo; +use crate::hummock::manager::gc::DeleteObjectTracker; use crate::hummock::manager::worker::{HummockManagerEvent, HummockManagerEventSender}; -use crate::hummock::manager::{commit_multi_var, create_trx_wrapper, read_lock, write_lock}; +use crate::hummock::manager::{commit_multi_var, create_trx_wrapper}; use crate::hummock::metrics_utils::{ trigger_safepoint_stat, trigger_write_stop_stats, LocalTableMetrics, }; @@ -71,30 +72,21 @@ impl Drop for HummockVersionSafePoint { #[derive(Default)] pub struct Versioning { // Volatile states below - /// Avoide commit epoch epochs + /// Avoid commit epoch epochs /// Don't persist compaction version delta to meta store pub disable_commit_epochs: bool, /// Latest hummock version pub current_version: HummockVersion, - /// Objects that waits to be deleted from object store. It comes from either compaction, or - /// full GC (listing object store). - pub objects_to_delete: HashSet, - /// `version_safe_points` is similar to `pinned_versions` expect for being a transient state. - pub version_safe_points: Vec, - /// Tables that write limit is trigger for. - pub write_limit: HashMap, + pub local_metrics: HashMap, // Persistent states below pub hummock_version_deltas: BTreeMap, - pub pinned_versions: BTreeMap, - pub pinned_snapshots: BTreeMap, /// Stats for latest hummock version. pub version_stats: HummockVersionStats, pub checkpoint: HummockVersionCheckpoint, - pub local_metrics: HashMap, } -impl Versioning { +impl ContextInfo { pub fn min_pinned_version_id(&self) -> HummockVersionId { let mut min_pinned_version_id = HummockVersionId::MAX; for id in self @@ -107,24 +99,30 @@ impl Versioning { } min_pinned_version_id } +} +impl Versioning { /// Marks all objects <= `min_pinned_version_id` for deletion. - pub(super) fn mark_objects_for_deletion(&mut self) { - let min_pinned_version_id = self.min_pinned_version_id(); - self.objects_to_delete.extend( + pub(super) fn mark_objects_for_deletion( + &self, + context_info: &ContextInfo, + delete_object_tracker: &DeleteObjectTracker, + ) { + let min_pinned_version_id = context_info.min_pinned_version_id(); + delete_object_tracker.add( self.checkpoint .stale_objects .iter() .filter(|(version_id, _)| **version_id <= min_pinned_version_id) - .flat_map(|(_, stale_objects)| stale_objects.id.clone()), + .flat_map(|(_, stale_objects)| stale_objects.id.iter().cloned()), ); } } impl HummockManager { - #[named] pub async fn list_pinned_version(&self) -> Vec { - read_lock!(self, versioning) + self.context_info + .read() .await .pinned_versions .values() @@ -132,9 +130,9 @@ impl HummockManager { .collect_vec() } - #[named] pub async fn list_pinned_snapshot(&self) -> Vec { - read_lock!(self, versioning) + self.context_info + .read() .await .pinned_snapshots .values() @@ -159,16 +157,15 @@ impl HummockManager { Ok(workers) } - #[named] pub async fn get_version_stats(&self) -> HummockVersionStats { - read_lock!(self, versioning).await.version_stats.clone() + self.versioning.read().await.version_stats.clone() } - #[named] pub async fn register_safe_point(&self) -> HummockVersionSafePoint { - let mut wl = write_lock!(self, versioning).await; + let versioning = self.versioning.read().await; + let mut wl = self.context_info.write().await; let safe_point = HummockVersionSafePoint { - id: wl.current_version.id, + id: versioning.current_version.id, event_sender: self.event_sender.clone(), }; wl.version_safe_points.push(safe_point.id); @@ -176,9 +173,8 @@ impl HummockManager { safe_point } - #[named] pub async fn unregister_safe_point(&self, safe_point: HummockVersionId) { - let mut wl = write_lock!(self, versioning).await; + let mut wl = self.context_info.write().await; let version_safe_points = &mut wl.version_safe_points; if let Some(pos) = version_safe_points.iter().position(|sp| *sp == safe_point) { version_safe_points.remove(pos); @@ -189,41 +185,40 @@ impl HummockManager { /// Updates write limits for `target_groups` and sends notification. /// Returns true if `write_limit` has been modified. /// The implementation acquires `versioning` lock and `compaction_group_manager` lock. - #[named] pub(super) async fn try_update_write_limits( &self, target_group_ids: &[CompactionGroupId], ) -> bool { - let mut guard = write_lock!(self, versioning).await; - let config_mgr = self.compaction_group_manager.read().await; + let versioning = self.versioning.read().await; + let mut cg_manager = self.compaction_group_manager.write().await; let target_group_configs = target_group_ids .iter() .filter_map(|id| { - config_mgr + cg_manager .try_get_compaction_group_config(*id) .map(|config| (*id, config)) }) .collect(); let mut new_write_limits = calc_new_write_limits( target_group_configs, - guard.write_limit.clone(), - &guard.current_version, + cg_manager.write_limit.clone(), + &versioning.current_version, ); let all_group_ids: HashSet<_> = - HashSet::from_iter(get_compaction_group_ids(&guard.current_version)); + HashSet::from_iter(get_compaction_group_ids(&versioning.current_version)); new_write_limits.retain(|group_id, _| all_group_ids.contains(group_id)); - if new_write_limits == guard.write_limit { + if new_write_limits == cg_manager.write_limit { return false; } tracing::debug!("Hummock stopped write is updated: {:#?}", new_write_limits); trigger_write_stop_stats(&self.metrics, &new_write_limits); - guard.write_limit = new_write_limits; + cg_manager.write_limit = new_write_limits; self.env .notification_manager() .notify_hummock_without_version( Operation::Add, Info::HummockWriteLimits(risingwave_pb::hummock::WriteLimits { - write_limits: guard.write_limit.clone(), + write_limits: cg_manager.write_limit.clone(), }), ); true @@ -231,22 +226,19 @@ impl HummockManager { /// Gets write limits. /// The implementation acquires `versioning` lock. - #[named] pub async fn write_limits(&self) -> HashMap { - let guard = read_lock!(self, versioning).await; + let guard = self.compaction_group_manager.read().await; guard.write_limit.clone() } - #[named] pub async fn list_branched_objects(&self) -> BTreeMap { - let guard = read_lock!(self, versioning).await; + let guard = self.versioning.read().await; guard.current_version.build_branched_sst_info() } - #[named] pub async fn rebuild_table_stats(&self) -> Result<()> { use crate::model::ValTransaction; - let mut versioning = write_lock!(self, versioning).await; + let mut versioning = self.versioning.write().await; let new_stats = rebuild_table_stats(&versioning.current_version); let mut version_stats = create_trx_wrapper!( self.meta_store_ref(), @@ -370,29 +362,30 @@ mod tests { }; use crate::hummock::compaction::compaction_config::CompactionConfigBuilder; + use crate::hummock::manager::context::ContextInfo; use crate::hummock::manager::versioning::{ - calc_new_write_limits, estimate_table_stats, rebuild_table_stats, Versioning, + calc_new_write_limits, estimate_table_stats, rebuild_table_stats, }; use crate::hummock::model::CompactionGroup; #[test] fn test_min_pinned_version_id() { - let mut versioning = Versioning::default(); - assert_eq!(versioning.min_pinned_version_id(), HummockVersionId::MAX); - versioning.pinned_versions.insert( + let mut context_info = ContextInfo::default(); + assert_eq!(context_info.min_pinned_version_id(), HummockVersionId::MAX); + context_info.pinned_versions.insert( 1, HummockPinnedVersion { context_id: 1, min_pinned_id: 10, }, ); - assert_eq!(versioning.min_pinned_version_id(), 10); - versioning.version_safe_points.push(5); - assert_eq!(versioning.min_pinned_version_id(), 5); - versioning.version_safe_points.clear(); - assert_eq!(versioning.min_pinned_version_id(), 10); - versioning.pinned_versions.clear(); - assert_eq!(versioning.min_pinned_version_id(), HummockVersionId::MAX); + assert_eq!(context_info.min_pinned_version_id(), 10); + context_info.version_safe_points.push(5); + assert_eq!(context_info.min_pinned_version_id(), 5); + context_info.version_safe_points.clear(); + assert_eq!(context_info.min_pinned_version_id(), 10); + context_info.pinned_versions.clear(); + assert_eq!(context_info.min_pinned_version_id(), HummockVersionId::MAX); } #[test] diff --git a/src/meta/src/hummock/vacuum.rs b/src/meta/src/hummock/vacuum.rs index 9e45fa5a2bcc..6cde13507836 100644 --- a/src/meta/src/hummock/vacuum.rs +++ b/src/meta/src/hummock/vacuum.rs @@ -95,7 +95,7 @@ impl VacuumManager { pending_object_ids } else { // 2. If no pending SST objects, then fetch new ones. - let mut objects_to_delete = self.hummock_manager.get_objects_to_delete().await; + let mut objects_to_delete = self.hummock_manager.get_objects_to_delete(); self.filter_out_pinned_ssts(&mut objects_to_delete).await?; if objects_to_delete.is_empty() { return Ok(vec![]); @@ -237,13 +237,13 @@ mod tests { assert_eq!(vacuum.vacuum_metadata().await.unwrap(), 6); assert_eq!(vacuum.vacuum_metadata().await.unwrap(), 0); - assert!(hummock_manager.get_objects_to_delete().await.is_empty()); + assert!(hummock_manager.get_objects_to_delete().is_empty()); hummock_manager .unpin_version_before(context_id, HummockVersionId::MAX) .await .unwrap(); hummock_manager.create_version_checkpoint(0).await.unwrap(); - assert!(!hummock_manager.get_objects_to_delete().await.is_empty()); + assert!(!hummock_manager.get_objects_to_delete().is_empty()); // No SST deletion is scheduled because no available worker. assert_eq!(vacuum.vacuum_object().await.unwrap().len(), 0); let _receiver = compactor_manager.add_compactor(context_id); diff --git a/src/meta/src/rpc/metrics.rs b/src/meta/src/rpc/metrics.rs index 2df847516c4a..28520720e98f 100644 --- a/src/meta/src/rpc/metrics.rs +++ b/src/meta/src/rpc/metrics.rs @@ -449,7 +449,7 @@ impl MetaMetrics { let hummock_manager_lock_time = register_histogram_vec_with_registry!( "hummock_manager_lock_time", "latency for hummock manager to acquire the rwlock", - &["method", "lock_name", "lock_type"], + &["lock_name", "lock_type"], registry ) .unwrap();