From 7d46ac880266658a0916a05f79d3b2e5f9096dc0 Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Mon, 11 Nov 2024 15:18:46 +0800 Subject: [PATCH] refactor: list and delete objects sequentially --- src/meta/service/src/hummock_service.rs | 19 +-- src/meta/src/hummock/manager/gc.rs | 89 ++++---------- src/meta/src/hummock/manager/mod.rs | 7 +- src/meta/src/hummock/manager/tests.rs | 87 +++---------- src/meta/src/hummock/manager/timer_task.rs | 27 ++-- src/meta/src/hummock/manager/utils.rs | 4 - src/meta/src/hummock/mod.rs | 31 ----- src/storage/hummock_test/src/lib.rs | 2 - src/storage/hummock_test/src/vacuum_tests.rs | 123 ------------------- 9 files changed, 69 insertions(+), 320 deletions(-) delete mode 100644 src/storage/hummock_test/src/vacuum_tests.rs diff --git a/src/meta/service/src/hummock_service.rs b/src/meta/service/src/hummock_service.rs index 5cd5fdfe63145..f9a556009999e 100644 --- a/src/meta/service/src/hummock_service.rs +++ b/src/meta/service/src/hummock_service.rs @@ -239,13 +239,18 @@ impl HummockManagerService for HummockServiceImpl { request: Request, ) -> Result, Status> { let req = request.into_inner(); - self.hummock_manager - .start_full_gc( - Duration::from_secs(req.sst_retention_time_sec), - req.prefix, - Some(self.backup_manager.clone()), - ) - .await?; + let backup_manager_2 = self.backup_manager.clone(); + let hummock_manager_2 = self.hummock_manager.clone(); + tokio::task::spawn(async move { + let _ = hummock_manager_2 + .start_full_gc( + Duration::from_secs(req.sst_retention_time_sec), + req.prefix, + Some(backup_manager_2), + ) + .await + .inspect_err(|e| tracing::warn!(?e, "Failed to start GC.")); + }); Ok(Response::new(TriggerFullGcResponse { status: None })) } diff --git a/src/meta/src/hummock/manager/gc.rs b/src/meta/src/hummock/manager/gc.rs index b8817b144f131..9f35082ca1eb5 100644 --- a/src/meta/src/hummock/manager/gc.rs +++ b/src/meta/src/hummock/manager/gc.rs @@ -23,7 +23,6 @@ use chrono::DateTime; use futures::future::try_join_all; use futures::{future, StreamExt, TryStreamExt}; use itertools::Itertools; -use parking_lot::Mutex; use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_common::util::epoch::Epoch; use risingwave_hummock_sdk::{ @@ -45,34 +44,6 @@ use crate::manager::MetadataManager; use crate::model::BTreeMapTransaction; use crate::MetaResult; -#[derive(Default)] -pub(super) struct DeleteObjectTracker { - /// Objects that waits to be deleted from object store. It comes from either compaction, or - /// full GC (listing object store). - objects_to_delete: Mutex>, -} - -impl DeleteObjectTracker { - pub(super) fn add(&self, objects: impl Iterator) { - self.objects_to_delete.lock().extend(objects) - } - - pub(super) fn current(&self) -> HashSet { - self.objects_to_delete.lock().clone() - } - - pub(super) fn clear(&self) { - self.objects_to_delete.lock().clear(); - } - - pub(super) fn ack<'a>(&self, objects: impl Iterator) { - let mut lock = self.objects_to_delete.lock(); - for object in objects { - lock.remove(object); - } - } -} - pub(crate) struct GcManager { store: ObjectStoreRef, path_prefix: String, @@ -180,21 +151,11 @@ impl GcManager { } impl HummockManager { - /// Gets SST objects that is safe to be deleted from object store. - pub fn get_objects_to_delete(&self) -> Vec { - self.delete_object_tracker - .current() - .iter() - .cloned() - .collect() - } - /// Acknowledges SSTs have been deleted from object store. pub async fn ack_deleted_objects( &self, object_ids: &HashSet, ) -> Result<()> { - self.delete_object_tracker.ack(object_ids.iter()); let mut versioning_guard = self.versioning.write().await; for stale_objects in versioning_guard.checkpoint.stale_objects.values_mut() { stale_objects.id.retain(|id| !object_ids.contains(id)); @@ -246,11 +207,11 @@ impl HummockManager { Ok((batch.len(), deltas_to_delete.len() - batch.len())) } - /// Extends `objects_to_delete` according to object store list result. - pub async fn extend_objects_to_delete( + /// Filters by Hummock version and Writes GC history. + pub async fn finalize_objects_to_delete( &self, - object_ids: &[HummockSstableObjectId], - ) -> Result { + object_ids: impl Iterator + Clone, + ) -> Result> { // This lock ensures `commit_epoch` and `report_compat_task` can see the latest GC history during sanity check. let versioning = self.versioning.read().await; let tracked_object_ids: HashSet = { @@ -277,17 +238,13 @@ impl HummockManager { ); tracked_object_ids }; - let to_delete = object_ids - .iter() - .filter(|object_id| !tracked_object_ids.contains(object_id)) - .copied() - .collect_vec(); - let to_delete_num = to_delete.len(); - self.write_gc_history(to_delete.iter().cloned()).await?; - self.delete_object_tracker.add(to_delete.into_iter()); - Ok(to_delete_num) + let to_delete = object_ids.filter(|object_id| !tracked_object_ids.contains(object_id)); + self.write_gc_history(to_delete.clone()).await?; + Ok(to_delete.collect()) } + /// LIST object store and DELETE stale objects, in batches. + /// GC can be very slow. Spawn a dedicated tokio task for it. pub async fn start_full_gc( &self, sst_retention_time: Duration, @@ -315,14 +272,14 @@ impl HummockManager { let mut total_object_size = 0; tracing::info!( retention_sec = sst_retention_time.as_secs(), - prefix = prefix.as_ref().unwrap_or(&String::from("")), + prefix, limit, "Start GC." ); loop { tracing::debug!( retention_sec = sst_retention_time.as_secs(), - prefix = prefix.as_ref().unwrap_or(&String::from("")), + prefix, start_after, limit, "Start a GC batch." @@ -409,19 +366,23 @@ impl HummockManager { .collect_vec(); let after_metadata_backup = object_ids.len(); // filter by version - let after_version = self.extend_objects_to_delete(&object_ids).await?; + let after_version = self + .finalize_objects_to_delete(object_ids.into_iter()) + .await?; + let after_version_count = after_version.len(); metrics .full_gc_selected_object_count - .observe(after_version as _); + .observe(after_version_count as _); tracing::info!( candidate_object_number, after_min_sst_id, after_time_travel, after_metadata_backup, - after_version, + after_version_count, "complete gc batch" ); - Ok(after_version) + self.delete_objects(after_version).await?; + Ok(after_version_count) } pub async fn now(&self) -> Result { @@ -531,12 +492,10 @@ impl HummockManager { /// Deletes stale SST objects from object store. /// /// Returns the total count of deleted SST objects. - pub async fn delete_objects(&self) -> MetaResult { - // Select SST objects to delete. - let mut objects_to_delete = self.get_objects_to_delete(); - if objects_to_delete.is_empty() { - return Ok(0); - } + pub async fn delete_objects( + &self, + mut objects_to_delete: Vec, + ) -> Result { let total = objects_to_delete.len(); let mut batch_size = 1000usize; while !objects_to_delete.is_empty() { @@ -555,7 +514,7 @@ impl HummockManager { .delete_objects(delete_batch.into_iter()) .await?; self.ack_deleted_objects(&deleted_object_ids).await?; - tracing::info!(?deleted_object_ids, "Finish deleting objects."); + tracing::debug!(?deleted_object_ids, "Finish deleting objects."); } Ok(total) } diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index ff98529596d6e..2d16fc31c1049 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -42,7 +42,7 @@ use crate::hummock::compaction::CompactStatus; use crate::hummock::error::Result; use crate::hummock::manager::checkpoint::HummockVersionCheckpoint; use crate::hummock::manager::context::ContextInfo; -use crate::hummock::manager::gc::{DeleteObjectTracker, FullGcState, GcManager}; +use crate::hummock::manager::gc::{FullGcState, GcManager}; use crate::hummock::CompactorManagerRef; use crate::manager::{MetaSrvEnv, MetadataManager}; use crate::model::{ClusterId, MetadataModelError}; @@ -91,9 +91,6 @@ pub struct HummockManager { pub compactor_manager: CompactorManagerRef, event_sender: HummockManagerEventSender, - - delete_object_tracker: DeleteObjectTracker, - object_store: ObjectStoreRef, version_checkpoint_path: String, version_archive_dir: String, @@ -275,7 +272,6 @@ impl HummockManager { // compaction_request_channel: parking_lot::RwLock::new(None), compactor_manager, event_sender: tx, - delete_object_tracker: Default::default(), object_store, version_checkpoint_path, version_archive_dir, @@ -412,7 +408,6 @@ impl HummockManager { .map(|m| (m.context_id as HummockContextId, m.into())) .collect(); - self.delete_object_tracker.clear(); self.initial_compaction_group_config_after_load( versioning_guard, self.compaction_group_manager.write().await.deref_mut(), diff --git a/src/meta/src/hummock/manager/tests.rs b/src/meta/src/hummock/manager/tests.rs index 65913d81628a7..3f97eaacdbefd 100644 --- a/src/meta/src/hummock/manager/tests.rs +++ b/src/meta/src/hummock/manager/tests.rs @@ -567,8 +567,6 @@ async fn test_hummock_manager_basic() { init_version_id + commit_log_count + register_log_count, ); } - // objects_to_delete is always empty because no compaction is ever invoked. - assert!(hummock_manager.get_objects_to_delete().is_empty()); assert_eq!( hummock_manager .delete_version_deltas(usize::MAX) @@ -580,7 +578,6 @@ async fn test_hummock_manager_basic() { hummock_manager.create_version_checkpoint(1).await.unwrap(), commit_log_count + register_log_count ); - assert!(hummock_manager.get_objects_to_delete().is_empty()); assert_eq!( hummock_manager .delete_version_deltas(usize::MAX) @@ -1181,24 +1178,20 @@ async fn test_extend_objects_to_delete() { }) .max() .unwrap(); - let orphan_sst_num = 10; + let orphan_sst_num: usize = 10; let all_object_ids = sst_infos .iter() .flatten() .map(|s| s.object_id) - .chain(max_committed_object_id + 1..=max_committed_object_id + orphan_sst_num) + .chain(max_committed_object_id + 1..=max_committed_object_id + orphan_sst_num as u64) .collect_vec(); - assert!(hummock_manager.get_objects_to_delete().is_empty()); assert_eq!( hummock_manager - .extend_objects_to_delete(&all_object_ids) + .finalize_objects_to_delete(all_object_ids.clone().into_iter()) .await - .unwrap(), - orphan_sst_num as usize - ); - assert_eq!( - hummock_manager.get_objects_to_delete().len(), - orphan_sst_num as usize + .unwrap() + .len(), + orphan_sst_num ); // Checkpoint @@ -1206,50 +1199,30 @@ async fn test_extend_objects_to_delete() { hummock_manager.create_version_checkpoint(1).await.unwrap(), 6 ); - assert_eq!( - hummock_manager.get_objects_to_delete().len(), - orphan_sst_num as usize - ); // since version1 is still pinned, the sst removed in compaction can not be reclaimed. assert_eq!( hummock_manager - .extend_objects_to_delete(&all_object_ids) + .finalize_objects_to_delete(all_object_ids.clone().into_iter()) .await - .unwrap(), - orphan_sst_num as usize + .unwrap() + .len(), + orphan_sst_num ); - let objects_to_delete = hummock_manager.get_objects_to_delete(); - assert_eq!(objects_to_delete.len(), orphan_sst_num as usize); let pinned_version2: HummockVersion = hummock_manager.pin_version(context_id).await.unwrap(); - let objects_to_delete = hummock_manager.get_objects_to_delete(); - assert_eq!( - objects_to_delete.len(), - orphan_sst_num as usize, - "{:?}", - objects_to_delete - ); hummock_manager .unpin_version_before(context_id, pinned_version2.id) .await .unwrap(); - let objects_to_delete = hummock_manager.get_objects_to_delete(); - assert_eq!( - objects_to_delete.len(), - orphan_sst_num as usize, - "{:?}", - objects_to_delete - ); // version1 is unpin, but version2 is pinned, and version2 is the checkpoint version. // stale objects are combined in the checkpoint of version2, so no sst to reclaim assert_eq!( hummock_manager - .extend_objects_to_delete(&all_object_ids) + .finalize_objects_to_delete(all_object_ids.clone().into_iter()) .await - .unwrap(), - orphan_sst_num as usize + .unwrap() + .len(), + orphan_sst_num ); - let objects_to_delete = hummock_manager.get_objects_to_delete(); - assert_eq!(objects_to_delete.len(), orphan_sst_num as usize); let new_epoch = version_max_committed_epoch(&pinned_version2).next_epoch(); hummock_meta_client .commit_epoch( @@ -1272,13 +1245,12 @@ async fn test_extend_objects_to_delete() { // in the stale objects of version2 checkpoint assert_eq!( hummock_manager - .extend_objects_to_delete(&all_object_ids) + .finalize_objects_to_delete(all_object_ids.clone().into_iter()) .await - .unwrap(), - orphan_sst_num as usize + 3 + .unwrap() + .len(), + orphan_sst_num + 3 ); - let objects_to_delete = hummock_manager.get_objects_to_delete(); - assert_eq!(objects_to_delete.len(), orphan_sst_num as usize + 3); } #[tokio::test] @@ -2583,7 +2555,6 @@ async fn test_vacuum() { context_id, )); assert_eq!(hummock_manager.delete_metadata().await.unwrap(), 0); - assert_eq!(hummock_manager.delete_objects().await.unwrap(), 0); hummock_manager.pin_version(context_id).await.unwrap(); let compaction_group_id = StaticCompactionGroupId::StateDefault.into(); let sst_infos = add_test_tables( @@ -2597,14 +2568,12 @@ async fn test_vacuum() { assert_eq!(hummock_manager.delete_metadata().await.unwrap(), 6); assert_eq!(hummock_manager.delete_metadata().await.unwrap(), 0); - assert!(hummock_manager.get_objects_to_delete().is_empty()); hummock_manager .unpin_version_before(context_id, HummockVersionId::MAX) .await .unwrap(); hummock_manager.create_version_checkpoint(0).await.unwrap(); - assert!(hummock_manager.get_objects_to_delete().is_empty()); - hummock_manager + let deleted = hummock_manager .complete_gc_batch( sst_infos .iter() @@ -2614,21 +2583,5 @@ async fn test_vacuum() { ) .await .unwrap(); - assert_eq!(hummock_manager.get_objects_to_delete().len(), 3); - assert_eq!( - hummock_manager - .get_objects_to_delete() - .into_iter() - .sorted() - .collect::>(), - sst_infos[0] - .iter() - .map(|s| s.object_id) - .sorted() - .collect::>() - ); - // No SST deletion is scheduled because no available worker. - assert_eq!(hummock_manager.delete_objects().await.unwrap(), 3); - // No objects_to_delete. - assert_eq!(hummock_manager.delete_objects().await.unwrap(), 0); + assert_eq!(deleted, 3); } diff --git a/src/meta/src/hummock/manager/timer_task.rs b/src/meta/src/hummock/manager/timer_task.rs index 7dc0e0b919325..e68c5e0ec0d08 100644 --- a/src/meta/src/hummock/manager/timer_task.rs +++ b/src/meta/src/hummock/manager/timer_task.rs @@ -437,21 +437,18 @@ impl HummockManager { HummockTimerEvent::FullGc => { let retention_sec = hummock_manager.env.opts.min_sst_retention_time_sec; - match hummock_manager - .start_full_gc( - Duration::from_secs(retention_sec), - None, - backup_manager.clone(), - ) - .await - { - Ok(_) => { - tracing::info!("Start periodic GC."); - } - Err(e) => { - tracing::error!(?e, "Failed to start GC."); - } - } + let backup_manager_2 = backup_manager.clone(); + let hummock_manager_2 = hummock_manager.clone(); + tokio::task::spawn(async move { + let _ = hummock_manager_2 + .start_full_gc( + Duration::from_secs(retention_sec), + None, + backup_manager_2, + ) + .await + .inspect_err(|e| warn!(?e, "Failed to start GC.")); + }); } } } diff --git a/src/meta/src/hummock/manager/utils.rs b/src/meta/src/hummock/manager/utils.rs index 1a8eb2d5d27ec..577e549961862 100644 --- a/src/meta/src/hummock/manager/utils.rs +++ b/src/meta/src/hummock/manager/utils.rs @@ -70,7 +70,6 @@ impl HummockManager { let mut compaction_guard = self.compaction.write().await; let mut versioning_guard = self.versioning.write().await; let mut context_info_guard = self.context_info.write().await; - let objects_to_delete = self.delete_object_tracker.current(); // We don't check `checkpoint` because it's allowed to update its in memory state without // persisting to object store. let get_state = |compaction_guard: &mut Compaction, @@ -110,9 +109,6 @@ impl HummockManager { mem_state, loaded_state, "hummock in-mem state is inconsistent with meta store state", ); - self.delete_object_tracker.clear(); - self.delete_object_tracker - .add(objects_to_delete.into_iter()); } pub async fn get_new_sst_ids(&self, number: u32) -> Result { diff --git a/src/meta/src/hummock/mod.rs b/src/meta/src/hummock/mod.rs index cdcc30612a90a..b9a4d3abba468 100644 --- a/src/meta/src/hummock/mod.rs +++ b/src/meta/src/hummock/mod.rs @@ -52,10 +52,6 @@ pub fn start_hummock_workers( hummock_manager.clone(), Duration::from_secs(meta_opts.vacuum_interval_sec), ), - start_vacuum_object_loop( - hummock_manager, - Duration::from_secs(meta_opts.vacuum_interval_sec), - ), ]; workers } @@ -87,33 +83,6 @@ pub fn start_vacuum_metadata_loop( (join_handle, shutdown_tx) } -/// Starts a task to periodically vacuum stale objects. -pub fn start_vacuum_object_loop( - vacuum: HummockManagerRef, - interval: Duration, -) -> (JoinHandle<()>, Sender<()>) { - let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel(); - let join_handle = tokio::spawn(async move { - let mut min_trigger_interval = tokio::time::interval(interval); - min_trigger_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); - loop { - tokio::select! { - // Wait for interval - _ = min_trigger_interval.tick() => {}, - // Shutdown vacuum - _ = &mut shutdown_rx => { - tracing::info!("Vacuum object loop is stopped"); - return; - } - } - if let Err(err) = vacuum.delete_objects().await { - tracing::warn!(error = %err.as_report(), "Vacuum object error"); - } - } - }); - (join_handle, shutdown_tx) -} - pub fn start_checkpoint_loop( hummock_manager: HummockManagerRef, interval: Duration, diff --git a/src/storage/hummock_test/src/lib.rs b/src/storage/hummock_test/src/lib.rs index a89ead17944f5..7cac9f756d9e0 100644 --- a/src/storage/hummock_test/src/lib.rs +++ b/src/storage/hummock_test/src/lib.rs @@ -26,8 +26,6 @@ mod snapshot_tests; mod state_store_tests; #[cfg(any(test, feature = "test"))] pub mod test_utils; -#[cfg(test)] -mod vacuum_tests; #[cfg(test)] mod hummock_read_version_tests; diff --git a/src/storage/hummock_test/src/vacuum_tests.rs b/src/storage/hummock_test/src/vacuum_tests.rs deleted file mode 100644 index 893d2a05dca9c..0000000000000 --- a/src/storage/hummock_test/src/vacuum_tests.rs +++ /dev/null @@ -1,123 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::iter; -use std::ops::Sub; -use std::sync::Arc; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; - -use futures::stream; -use itertools::Itertools; -use risingwave_meta::hummock::test_utils::setup_compute_env; -use risingwave_meta::hummock::MockHummockMetaClient; -use risingwave_object_store::object::ObjectMetadata; -use risingwave_pb::hummock::{FullScanTask, VacuumTask}; -use risingwave_storage::hummock::iterator::test_utils::mock_sstable_store; -use risingwave_storage::hummock::test_utils::{ - default_builder_opt_for_test, gen_default_test_sstable, -}; -use risingwave_storage::hummock::vacuum::Vacuum; - -#[tokio::test] -async fn test_vacuum() { - let sstable_store = mock_sstable_store().await; - // Put some SSTs to object store - let object_ids = (1..10).collect_vec(); - let mut sstables = vec![]; - for sstable_object_id in &object_ids { - let sstable = gen_default_test_sstable( - default_builder_opt_for_test(), - *sstable_object_id, - sstable_store.clone(), - ) - .await; - sstables.push(sstable); - } - - // Delete all existent SSTs and a nonexistent SSTs. Trying to delete a nonexistent SST is - // OK. - let nonexistent_id = 11u64; - let vacuum_task = VacuumTask { - sstable_object_ids: object_ids - .into_iter() - .chain(iter::once(nonexistent_id)) - .collect_vec(), - }; - let (_env, hummock_manager_ref, _cluster_ctl_ref, worker_id) = setup_compute_env(8080).await; - let mock_hummock_meta_client = Arc::new(MockHummockMetaClient::new( - hummock_manager_ref.clone(), - worker_id as _, - )); - Vacuum::handle_vacuum_task(sstable_store, &vacuum_task.sstable_object_ids) - .await - .unwrap(); - assert!(Vacuum::report_vacuum_task(vacuum_task, mock_hummock_meta_client).await); -} - -#[tokio::test] -async fn test_full_scan() { - let (_env, hummock_manager_ref, _cluster_ctl_ref, worker_id) = setup_compute_env(8080).await; - let sstable_store = mock_sstable_store().await; - let _mock_hummock_meta_client = Arc::new(MockHummockMetaClient::new( - hummock_manager_ref, - worker_id as _, - )); - let now_ts = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); - let object_store_list_result = vec![ - ObjectMetadata { - key: sstable_store.get_sst_data_path(1), - last_modified: now_ts.sub(Duration::from_secs(7200)).as_secs_f64(), - total_size: 128, - }, - ObjectMetadata { - key: sstable_store.get_sst_data_path(2), - last_modified: now_ts.sub(Duration::from_secs(3600)).as_secs_f64(), - total_size: 128, - }, - ]; - let object_metadata_iter = Box::pin(stream::iter(object_store_list_result.into_iter().map(Ok))); - - let task = FullScanTask { - sst_retention_watermark: 0, - prefix: None, - start_after: None, - limit: None, - }; - let (scan_result, ..) = Vacuum::full_scan_inner(task, object_metadata_iter.clone()) - .await - .unwrap(); - assert!(scan_result.is_empty()); - - let task = FullScanTask { - sst_retention_watermark: now_ts.sub(Duration::from_secs(6000)).as_secs(), - prefix: None, - start_after: None, - limit: None, - }; - let (scan_result, ..) = Vacuum::full_scan_inner(task, object_metadata_iter.clone()) - .await - .unwrap(); - assert_eq!(scan_result.into_iter().sorted().collect_vec(), vec![1]); - - let task = FullScanTask { - sst_retention_watermark: u64::MAX, - prefix: None, - start_after: None, - limit: None, - }; - let (scan_result, ..) = Vacuum::full_scan_inner(task, object_metadata_iter) - .await - .unwrap(); - assert_eq!(scan_result.into_iter().sorted().collect_vec(), vec![1, 2]); -}