diff --git a/proto/hummock.proto b/proto/hummock.proto index ec0e20f6013c..869c5af867d4 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -498,9 +498,9 @@ message SubscribeCompactionEventResponse { oneof event { CompactTask compact_task = 1; - VacuumTask vacuum_task = 2; - FullScanTask full_scan_task = 3; - ValidationTask validation_task = 4; + VacuumTask vacuum_task = 2 [deprecated = true]; + FullScanTask full_scan_task = 3 [deprecated = true]; + ValidationTask validation_task = 4 [deprecated = true]; CancelCompactTask cancel_compact_task = 5; PullTaskAck pull_task_ack = 6; @@ -554,14 +554,6 @@ message CancelCompactTask { uint64 task_id = 2; } -message ReportVacuumTaskRequest { - VacuumTask vacuum_task = 1; -} - -message ReportVacuumTaskResponse { - common.Status status = 1; -} - message TriggerManualCompactionRequest { uint64 compaction_group_id = 1; KeyRange key_range = 2; @@ -574,23 +566,6 @@ message TriggerManualCompactionResponse { common.Status status = 1; } -message ReportFullScanTaskRequest { - // Object ids that satisfying conditions specified by FullScanTask. - repeated uint64 object_ids = 1; - // Total count of objects before filtered by conditions specified by FullScanTask. - uint64 total_object_count = 2; - // Total size of objects before filtered by conditions specified by FullScanTask. - uint64 total_object_size = 3; - // Start key of the next page. - optional string next_start_after = 4; - // Start key of this page. - optional string start_after = 5; -} - -message ReportFullScanTaskResponse { - common.Status status = 1; -} - message TriggerFullGCRequest { uint64 sst_retention_time_sec = 1; optional string prefix = 2; @@ -819,9 +794,7 @@ service HummockManagerService { rpc TriggerCompactionDeterministic(TriggerCompactionDeterministicRequest) returns (TriggerCompactionDeterministicResponse); rpc DisableCommitEpoch(DisableCommitEpochRequest) returns (DisableCommitEpochResponse); rpc GetNewSstIds(GetNewSstIdsRequest) returns (GetNewSstIdsResponse); - rpc ReportVacuumTask(ReportVacuumTaskRequest) returns (ReportVacuumTaskResponse); rpc TriggerManualCompaction(TriggerManualCompactionRequest) returns (TriggerManualCompactionResponse); - rpc ReportFullScanTask(ReportFullScanTaskRequest) returns (ReportFullScanTaskResponse); rpc TriggerFullGC(TriggerFullGCRequest) returns (TriggerFullGCResponse); rpc RiseCtlGetPinnedVersionsSummary(RiseCtlGetPinnedVersionsSummaryRequest) returns (RiseCtlGetPinnedVersionsSummaryResponse); rpc RiseCtlListCompactionGroup(RiseCtlListCompactionGroupRequest) returns (RiseCtlListCompactionGroupResponse); diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index 2ea5480eba6f..f64398c209f0 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -522,13 +522,6 @@ pub async fn start_service_as_election_leader( .await .unwrap(); - let vacuum_manager = Arc::new(hummock::VacuumManager::new( - env.clone(), - hummock_manager.clone(), - backup_manager.clone(), - compactor_manager.clone(), - )); - let ddl_srv = DdlServiceImpl::new( env.clone(), metadata_manager.clone(), @@ -560,12 +553,12 @@ pub async fn start_service_as_election_leader( let sink_coordination_srv = SinkCoordinationServiceImpl::new(sink_manager); let hummock_srv = HummockServiceImpl::new( hummock_manager.clone(), - vacuum_manager.clone(), metadata_manager.clone(), + backup_manager.clone(), ); let health_srv = HealthServiceImpl::new(); - let backup_srv = BackupServiceImpl::new(backup_manager); + let backup_srv = BackupServiceImpl::new(backup_manager.clone()); let telemetry_srv = TelemetryInfoServiceImpl::new(env.meta_store()); let system_params_srv = SystemParamsServiceImpl::new( env.system_params_manager_impl_ref(), @@ -585,7 +578,6 @@ pub async fn start_service_as_election_leader( // sub_tasks executed concurrently. Can be shutdown via shutdown_all sub_tasks.extend(hummock::start_hummock_workers( hummock_manager.clone(), - vacuum_manager, // compaction_scheduler, &env.opts, )); @@ -603,7 +595,10 @@ pub async fn start_service_as_election_leader( sub_tasks.push(SystemParamsController::start_params_notifier( env.system_params_manager_impl_ref(), )); - sub_tasks.push(HummockManager::hummock_timer_task(hummock_manager.clone())); + sub_tasks.push(HummockManager::hummock_timer_task( + hummock_manager.clone(), + Some(backup_manager), + )); sub_tasks.extend(HummockManager::compaction_event_loop( hummock_manager, compactor_streams_change_rx, diff --git a/src/meta/service/src/hummock_service.rs b/src/meta/service/src/hummock_service.rs index 38562ab53ba2..0aa6e6a0c2e2 100644 --- a/src/meta/service/src/hummock_service.rs +++ b/src/meta/service/src/hummock_service.rs @@ -22,34 +22,34 @@ use risingwave_common::catalog::{TableId, SYS_CATALOG_START_ID}; use risingwave_hummock_sdk::key_range::KeyRange; use risingwave_hummock_sdk::version::HummockVersionDelta; use risingwave_hummock_sdk::HummockVersionId; +use risingwave_meta::backup_restore::BackupManagerRef; use risingwave_meta::manager::MetadataManager; use risingwave_pb::hummock::get_compaction_score_response::PickerInfo; use risingwave_pb::hummock::hummock_manager_service_server::HummockManagerService; use risingwave_pb::hummock::subscribe_compaction_event_request::Event as RequestEvent; use risingwave_pb::hummock::*; -use thiserror_ext::AsReport; use tonic::{Request, Response, Status, Streaming}; use crate::hummock::compaction::selector::ManualCompactionOption; -use crate::hummock::{HummockManagerRef, VacuumManagerRef}; +use crate::hummock::HummockManagerRef; use crate::RwReceiverStream; pub struct HummockServiceImpl { hummock_manager: HummockManagerRef, - vacuum_manager: VacuumManagerRef, metadata_manager: MetadataManager, + backup_manager: BackupManagerRef, } impl HummockServiceImpl { pub fn new( hummock_manager: HummockManagerRef, - vacuum_trigger: VacuumManagerRef, metadata_manager: MetadataManager, + backup_manager: BackupManagerRef, ) -> Self { HummockServiceImpl { hummock_manager, - vacuum_manager: vacuum_trigger, metadata_manager, + backup_manager, } } } @@ -174,17 +174,6 @@ impl HummockManagerService for HummockServiceImpl { })) } - async fn report_vacuum_task( - &self, - request: Request, - ) -> Result, Status> { - if let Some(vacuum_task) = request.into_inner().vacuum_task { - self.vacuum_manager.report_vacuum_task(vacuum_task).await?; - } - sync_point::sync_point!("AFTER_REPORT_VACUUM"); - Ok(Response::new(ReportVacuumTaskResponse { status: None })) - } - async fn trigger_manual_compaction( &self, request: Request, @@ -245,49 +234,24 @@ impl HummockManagerService for HummockServiceImpl { })) } - async fn report_full_scan_task( - &self, - request: Request, - ) -> Result, Status> { - let req = request.into_inner(); - let hummock_manager = self.hummock_manager.clone(); - hummock_manager.update_paged_metrics( - req.start_after, - req.next_start_after.clone(), - req.total_object_count, - req.total_object_size, - ); - let pinned_by_metadata_backup = self.vacuum_manager.backup_manager.list_pinned_ssts(); - // The following operation takes some time, so we do it in dedicated task and responds the - // RPC immediately. - tokio::spawn(async move { - match hummock_manager - .complete_full_gc( - req.object_ids, - req.next_start_after, - pinned_by_metadata_backup, - ) - .await - { - Ok(number) => { - tracing::info!("Full GC results {} SSTs to delete", number); - } - Err(e) => { - tracing::warn!(error = %e.as_report(), "Full GC SST failed"); - } - } - }); - Ok(Response::new(ReportFullScanTaskResponse { status: None })) - } - async fn trigger_full_gc( &self, request: Request, ) -> Result, Status> { let req = request.into_inner(); - self.hummock_manager - .start_full_gc(Duration::from_secs(req.sst_retention_time_sec), req.prefix) - .await?; + let backup_manager_2 = self.backup_manager.clone(); + let hummock_manager_2 = self.hummock_manager.clone(); + tokio::task::spawn(async move { + use thiserror_ext::AsReport; + let _ = hummock_manager_2 + .start_full_gc( + Duration::from_secs(req.sst_retention_time_sec), + req.prefix, + Some(backup_manager_2), + ) + .await + .inspect_err(|e| tracing::warn!(error = %e.as_report(), "Failed to start GC.")); + }); Ok(Response::new(TriggerFullGcResponse { status: None })) } diff --git a/src/meta/src/hummock/manager/context.rs b/src/meta/src/hummock/manager/context.rs index 6256c09ce75a..206da2ff7692 100644 --- a/src/meta/src/hummock/manager/context.rs +++ b/src/meta/src/hummock/manager/context.rs @@ -235,7 +235,6 @@ impl HummockManager { // HummockManager::now requires a write to the meta store. Thus, it should be avoided whenever feasible. if !sstables.is_empty() { // Sanity check to ensure SSTs to commit have not been full GCed yet. - // TODO: since HummockManager::complete_full_gc have already filtered out SSTs by min uncommitted SST id, this sanity check can be removed. let now = self.now().await?; check_sst_retention( now, diff --git a/src/meta/src/hummock/manager/gc.rs b/src/meta/src/hummock/manager/gc.rs index 4187ce818b34..1cf3ae25d171 100644 --- a/src/meta/src/hummock/manager/gc.rs +++ b/src/meta/src/hummock/manager/gc.rs @@ -16,81 +16,141 @@ use std::cmp; use std::collections::HashSet; use std::ops::Bound::{Excluded, Included}; use std::ops::DerefMut; +use std::sync::atomic::{AtomicBool, Ordering}; use std::time::{Duration, SystemTime}; use chrono::DateTime; use futures::future::try_join_all; +use futures::{future, StreamExt, TryStreamExt}; use itertools::Itertools; -use parking_lot::Mutex; -use risingwave_hummock_sdk::HummockSstableObjectId; +use risingwave_common::system_param::reader::SystemParamsRead; +use risingwave_common::util::epoch::Epoch; +use risingwave_hummock_sdk::{ + get_object_id_from_path, get_sst_data_path, HummockSstableObjectId, OBJECT_SUFFIX, +}; use risingwave_meta_model::hummock_sequence::HUMMOCK_NOW; use risingwave_meta_model::{hummock_gc_history, hummock_sequence}; use risingwave_meta_model_migration::OnConflict; -use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent; -use risingwave_pb::hummock::FullScanTask; +use risingwave_object_store::object::{ObjectMetadataIter, ObjectStoreRef}; use risingwave_pb::stream_service::GetMinUncommittedSstIdRequest; use risingwave_rpc_client::StreamClientPool; use sea_orm::{ActiveValue, ColumnTrait, EntityTrait, QueryFilter, Set}; +use crate::backup_restore::BackupManagerRef; use crate::hummock::error::{Error, Result}; use crate::hummock::manager::commit_multi_var; use crate::hummock::HummockManager; use crate::manager::MetadataManager; use crate::model::BTreeMapTransaction; +use crate::MetaResult; -#[derive(Default)] -pub(super) struct DeleteObjectTracker { - /// Objects that waits to be deleted from object store. It comes from either compaction, or - /// full GC (listing object store). - objects_to_delete: Mutex>, +pub(crate) struct GcManager { + store: ObjectStoreRef, + path_prefix: String, + use_new_object_prefix_strategy: bool, } -impl DeleteObjectTracker { - pub(super) fn add(&self, objects: impl Iterator) { - self.objects_to_delete.lock().extend(objects) +impl GcManager { + pub fn new( + store: ObjectStoreRef, + path_prefix: &str, + use_new_object_prefix_strategy: bool, + ) -> Self { + Self { + store, + path_prefix: path_prefix.to_owned(), + use_new_object_prefix_strategy, + } } - pub(super) fn current(&self) -> HashSet { - self.objects_to_delete.lock().clone() + /// Deletes all SSTs specified in the given list of IDs from storage. + pub async fn delete_objects( + &self, + object_id_list: impl Iterator, + ) -> Result<()> { + let mut paths = Vec::with_capacity(1000); + for object_id in object_id_list { + let obj_prefix = self + .store + .get_object_prefix(object_id, self.use_new_object_prefix_strategy); + paths.push(get_sst_data_path(&obj_prefix, &self.path_prefix, object_id)); + } + self.store.delete_objects(&paths).await?; + Ok(()) } - pub(super) fn clear(&self) { - self.objects_to_delete.lock().clear(); + async fn list_object_metadata_from_object_store( + &self, + prefix: Option, + start_after: Option, + limit: Option, + ) -> Result { + let list_path = format!("{}/{}", self.path_prefix, prefix.unwrap_or("".into())); + let raw_iter = self.store.list(&list_path, start_after, limit).await?; + let iter = raw_iter.filter(|r| match r { + Ok(i) => future::ready(i.key.ends_with(&format!(".{}", OBJECT_SUFFIX))), + Err(_) => future::ready(true), + }); + Ok(Box::pin(iter)) } - pub(super) fn ack<'a>(&self, objects: impl Iterator) { - let mut lock = self.objects_to_delete.lock(); - for object in objects { - lock.remove(object); - } + /// Returns **filtered** object ids, and **unfiltered** total object count and size. + pub async fn list_objects( + &self, + sst_retention_watermark: u64, + prefix: Option, + start_after: Option, + limit: Option, + ) -> Result<(Vec, u64, u64, Option)> { + tracing::debug!( + sst_retention_watermark, + prefix, + start_after, + limit, + "Try to list objects." + ); + let mut total_object_count = 0; + let mut total_object_size = 0; + let mut next_start_after: Option = None; + let metadata_iter = self + .list_object_metadata_from_object_store(prefix, start_after, limit.map(|i| i as usize)) + .await?; + let filtered = metadata_iter + .filter_map(|r| { + let result = match r { + Ok(o) => { + total_object_count += 1; + total_object_size += o.total_size; + // Determine if the LIST has been truncated. + // A false positives would at most cost one additional LIST later. + if let Some(limit) = limit + && limit == total_object_count + { + next_start_after = Some(o.key.clone()); + tracing::debug!(next_start_after, "set next start after"); + } + if o.last_modified < sst_retention_watermark as f64 { + Some(Ok(get_object_id_from_path(&o.key))) + } else { + None + } + } + Err(e) => Some(Err(Error::ObjectStore(e))), + }; + async move { result } + }) + .try_collect::>() + .await?; + Ok(( + filtered, + total_object_count, + total_object_size as u64, + next_start_after, + )) } } impl HummockManager { - /// Gets SST objects that is safe to be deleted from object store. - pub fn get_objects_to_delete(&self) -> Vec { - self.delete_object_tracker - .current() - .iter() - .cloned() - .collect_vec() - } - - /// Acknowledges SSTs have been deleted from object store. - pub async fn ack_deleted_objects(&self, object_ids: &[HummockSstableObjectId]) -> Result<()> { - self.delete_object_tracker.ack(object_ids.iter()); - let mut versioning_guard = self.versioning.write().await; - for stale_objects in versioning_guard.checkpoint.stale_objects.values_mut() { - stale_objects.id.retain(|id| !object_ids.contains(id)); - } - versioning_guard - .checkpoint - .stale_objects - .retain(|_, stale_objects| !stale_objects.id.is_empty()); - drop(versioning_guard); - Ok(()) - } - /// Deletes at most `batch_size` deltas. /// /// Returns (number of deleted deltas, number of remain `deltas_to_delete`). @@ -131,18 +191,15 @@ impl HummockManager { Ok((batch.len(), deltas_to_delete.len() - batch.len())) } - /// Extends `objects_to_delete` according to object store full scan result. - /// Caller should ensure `object_ids` doesn't include any SST objects belong to a on-going - /// version write. That's to say, these `object_ids` won't appear in either `commit_epoch` or - /// `report_compact_task`. - pub async fn extend_objects_to_delete_from_scan( + /// Filters by Hummock version and Writes GC history. + pub async fn finalize_objects_to_delete( &self, - object_ids: &[HummockSstableObjectId], - ) -> Result { + object_ids: impl Iterator + Clone, + ) -> Result> { + // This lock ensures `commit_epoch` and `report_compat_task` can see the latest GC history during sanity check. + let versioning = self.versioning.read().await; let tracked_object_ids: HashSet = { - let versioning = self.versioning.read().await; let context_info = self.context_info.read().await; - // object ids in checkpoint version let mut tracked_object_ids = versioning.checkpoint.version.get_object_ids(); // add object ids added between checkpoint version and current version @@ -165,89 +222,103 @@ impl HummockManager { ); tracked_object_ids }; - let to_delete = object_ids - .iter() - .filter(|object_id| !tracked_object_ids.contains(object_id)) - .copied() - .collect_vec(); - let to_delete_num = to_delete.len(); - self.write_gc_history(to_delete.iter().cloned()).await?; - // This lock ensures that during commit_epoch or report_compact_tasks, where versioning lock is held, - // no new objects will be marked for deletion here. - let _versioning = self.versioning.read().await; - self.delete_object_tracker.add(to_delete.into_iter()); - Ok(to_delete_num) + let to_delete = object_ids.filter(|object_id| !tracked_object_ids.contains(object_id)); + self.write_gc_history(to_delete.clone()).await?; + Ok(to_delete.collect()) } - /// Starts a full GC. - /// 1. Meta node sends a `FullScanTask` to a compactor in this method. - /// 2. The compactor returns scan result of object store to meta node. See - /// `HummockManager::full_scan_inner` in storage crate. - /// 3. Meta node decides which SSTs to delete. See `HummockManager::complete_full_gc`. - /// - /// Returns Ok(false) if there is no worker available. + /// LIST object store and DELETE stale objects, in batches. + /// GC can be very slow. Spawn a dedicated tokio task for it. pub async fn start_full_gc( &self, sst_retention_time: Duration, prefix: Option, - ) -> Result { + backup_manager: Option, + ) -> Result<()> { + if !self.full_gc_state.try_start() { + return Err(anyhow::anyhow!("failed to start GC due to an ongoing process").into()); + } + let _guard = scopeguard::guard(self.full_gc_state.clone(), |full_gc_state| { + full_gc_state.stop() + }); self.metrics.full_gc_trigger_count.inc(); - // Set a minimum sst_retention_time. let sst_retention_time = cmp::max( sst_retention_time, Duration::from_secs(self.env.opts.min_sst_retention_time_sec), ); - let start_after = self.full_gc_state.next_start_after(); - let limit = self.full_gc_state.limit; - tracing::info!( - retention_sec = sst_retention_time.as_secs(), - prefix = prefix.as_ref().unwrap_or(&String::from("")), - start_after, - limit, - "run full GC" - ); - - let compactor = match self.compactor_manager.next_compactor() { - None => { - tracing::warn!("full GC attempt but no available idle worker"); - return Ok(false); - } - Some(compactor) => compactor, - }; + let limit = self.env.opts.full_gc_object_limit; + let mut start_after = None; let sst_retention_watermark = self .now() .await? .saturating_sub(sst_retention_time.as_secs()); - compactor - .send_event(ResponseEvent::FullScanTask(FullScanTask { - sst_retention_watermark, + let mut total_object_count = 0; + let mut total_object_size = 0; + tracing::info!( + retention_sec = sst_retention_time.as_secs(), + prefix, + limit, + "Start GC." + ); + loop { + tracing::debug!( + retention_sec = sst_retention_time.as_secs(), prefix, start_after, limit, - })) - .map_err(|_| Error::CompactorUnreachable(compactor.context_id()))?; - Ok(true) + "Start a GC batch." + ); + let (object_ids, batch_object_count, batch_object_size, next_start_after) = self + .gc_manager + .list_objects( + sst_retention_watermark, + prefix.clone(), + start_after.clone(), + Some(limit), + ) + .await?; + total_object_count += batch_object_count; + total_object_size += batch_object_size; + tracing::debug!( + ?object_ids, + batch_object_count, + batch_object_size, + "Finish listing a GC batch." + ); + self.complete_gc_batch(object_ids, backup_manager.clone()) + .await?; + if next_start_after.is_none() { + break; + } + start_after = next_start_after; + } + tracing::info!(total_object_count, total_object_size, "Finish GC"); + self.metrics.total_object_size.set(total_object_size as _); + self.metrics.total_object_count.set(total_object_count as _); + Ok(()) } - /// Given candidate SSTs to GC, filter out false positive. - /// Returns number of SSTs to GC. - pub async fn complete_full_gc( + /// Given candidate SSTs to delete, filter out false positive. + /// Returns number of SSTs to delete. + pub(crate) async fn complete_gc_batch( &self, object_ids: Vec, - next_start_after: Option, - pinned_by_metadata_backup: HashSet, + backup_manager: Option, ) -> Result { + if object_ids.is_empty() { + return Ok(0); + } + // It's crucial to get pinned_by_metadata_backup only after object_ids. + let pinned_by_metadata_backup = backup_manager + .as_ref() + .map(|b| b.list_pinned_ssts()) + .unwrap_or_default(); // It's crucial to collect_min_uncommitted_sst_id (i.e. `min_sst_id`) only after LIST object store (i.e. `object_ids`). // Because after getting `min_sst_id`, new compute nodes may join and generate new uncommitted SSTs that are not covered by `min_sst_id`. // By getting `min_sst_id` after `object_ids`, it's ensured `object_ids` won't include any SSTs from those new compute nodes. let min_sst_id = collect_min_uncommitted_sst_id(&self.metadata_manager, self.env.stream_client_pool()) .await?; - self.full_gc_state.set_next_start_after(next_start_after); - if object_ids.is_empty() { - tracing::info!("SST full scan returns no SSTs."); - return Ok(0); - } let metrics = &self.metrics; let candidate_object_number = object_ids.len(); metrics @@ -279,19 +350,23 @@ impl HummockManager { .collect_vec(); let after_metadata_backup = object_ids.len(); // filter by version - let after_version = self.extend_objects_to_delete_from_scan(&object_ids).await?; + let after_version = self + .finalize_objects_to_delete(object_ids.into_iter()) + .await?; + let after_version_count = after_version.len(); metrics .full_gc_selected_object_count - .observe(after_version as _); + .observe(after_version_count as _); tracing::info!( candidate_object_number, after_min_sst_id, after_time_travel, after_metadata_backup, - after_version, - "complete full gc" + after_version_count, + "complete gc batch" ); - Ok(after_version) + self.delete_objects(after_version).await?; + Ok(after_version_count) } pub async fn now(&self) -> Result { @@ -333,32 +408,6 @@ impl HummockManager { Ok(now) } - pub fn update_paged_metrics( - &self, - start_after: Option, - next_start_after: Option, - total_object_count_in_page: u64, - total_object_size_in_page: u64, - ) { - let mut paged_metrics = self.paged_metrics.lock(); - paged_metrics.total_object_size.update( - start_after.clone(), - next_start_after.clone(), - total_object_size_in_page, - ); - paged_metrics.total_object_count.update( - start_after, - next_start_after, - total_object_count_in_page, - ); - if let Some(total_object_size) = paged_metrics.total_object_size.take() { - self.metrics.total_object_size.set(total_object_size as _); - } - if let Some(total_object_count) = paged_metrics.total_object_count.take() { - self.metrics.total_object_count.set(total_object_count as _); - } - } - async fn write_gc_history( &self, object_ids: impl Iterator, @@ -390,6 +439,68 @@ impl HummockManager { .await?; Ok(()) } + + /// Deletes stale Hummock metadata. + /// + /// Returns number of deleted deltas + pub async fn delete_metadata(&self) -> MetaResult { + let batch_size = 64usize; + let mut total_deleted = 0; + loop { + if total_deleted != 0 && self.env.opts.vacuum_spin_interval_ms != 0 { + tokio::time::sleep(Duration::from_millis(self.env.opts.vacuum_spin_interval_ms)) + .await; + } + let (deleted, remain) = self.delete_version_deltas(batch_size).await?; + total_deleted += deleted; + if total_deleted == 0 || remain < batch_size { + break; + } + } + + let current_epoch_time = Epoch::now().physical_time(); + let epoch_watermark = Epoch::from_physical_time( + current_epoch_time.saturating_sub( + self.env + .system_params_reader() + .await + .time_travel_retention_ms(), + ), + ) + .0; + self.truncate_time_travel_metadata(epoch_watermark).await?; + + Ok(total_deleted) + } + + /// Deletes stale SST objects from object store. + /// + /// Returns the total count of deleted SST objects. + pub async fn delete_objects( + &self, + mut objects_to_delete: Vec, + ) -> Result { + let total = objects_to_delete.len(); + let mut batch_size = 1000usize; + while !objects_to_delete.is_empty() { + if self.env.opts.vacuum_spin_interval_ms != 0 { + tokio::time::sleep(Duration::from_millis(self.env.opts.vacuum_spin_interval_ms)) + .await; + } + batch_size = cmp::min(objects_to_delete.len(), batch_size); + if batch_size == 0 { + break; + } + let delete_batch: HashSet<_> = objects_to_delete.drain(..batch_size).collect(); + tracing::debug!(?objects_to_delete, "Attempt to delete objects."); + let deleted_object_ids = delete_batch.clone(); + self.gc_manager + .delete_objects(delete_batch.into_iter()) + .await?; + tracing::debug!(?deleted_object_ids, "Finish deleting objects."); + } + Ok(total) + } } async fn collect_min_uncommitted_sst_id( @@ -417,108 +528,29 @@ async fn collect_min_uncommitted_sst_id( } pub struct FullGcState { - next_start_after: Mutex>, - limit: Option, + is_started: AtomicBool, } impl FullGcState { - pub fn new(limit: Option) -> Self { - Self { - next_start_after: Mutex::new(None), - limit, - } - } - - pub fn set_next_start_after(&self, next_start_after: Option) { - *self.next_start_after.lock() = next_start_after; - } - - pub fn next_start_after(&self) -> Option { - self.next_start_after.lock().clone() - } -} - -pub struct PagedMetrics { - total_object_count: PagedMetric, - total_object_size: PagedMetric, -} - -impl PagedMetrics { pub fn new() -> Self { Self { - total_object_count: PagedMetric::new(), - total_object_size: PagedMetric::new(), - } - } -} - -/// The metrics should be accumulated on a per-page basis and then finalized at the end. -pub struct PagedMetric { - /// identifier of a page - expect_start_key: Option, - /// accumulated metric value of pages seen so far - running_value: u64, - /// final metric value - sealed_value: Option, -} - -impl PagedMetric { - fn new() -> Self { - Self { - expect_start_key: None, - running_value: 0, - sealed_value: None, - } - } - - fn update( - &mut self, - current_start_key: Option, - next_start_key: Option, - value: u64, - ) { - // Encounter an update without pagination, replace current state. - if current_start_key.is_none() && next_start_key.is_none() { - self.running_value = value; - self.seal(); - return; - } - // Encounter an update from unexpected page, reset current state. - if current_start_key != self.expect_start_key { - self.reset(); - return; + is_started: AtomicBool::new(false), } - self.running_value += value; - // There are more pages to add. - if next_start_key.is_some() { - self.expect_start_key = next_start_key; - return; - } - // This is the last page, seal the metric value. - self.seal(); } - fn seal(&mut self) { - self.sealed_value = Some(self.running_value); - self.reset(); + pub fn try_start(&self) -> bool { + self.is_started + .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) + .is_ok() } - fn reset(&mut self) { - self.running_value = 0; - self.expect_start_key = None; - } - - fn take(&mut self) -> Option { - if self.sealed_value.is_some() { - self.reset(); - } - self.sealed_value.take() + pub fn stop(&self) { + self.is_started.store(false, Ordering::SeqCst); } } #[cfg(test)] mod tests { - use std::collections::HashSet; use std::sync::Arc; use std::time::Duration; @@ -526,7 +558,6 @@ mod tests { use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_rpc_client::HummockMetaClient; - use super::{PagedMetric, ResponseEvent}; use crate::hummock::test_utils::{add_test_tables, setup_compute_env}; use crate::hummock::MockHummockMetaClient; @@ -538,49 +569,18 @@ mod tests { worker_id as _, )); let compaction_group_id = StaticCompactionGroupId::StateDefault.into(); - let compactor_manager = hummock_manager.compactor_manager_ref_for_test(); - // No task scheduled because no available worker. - assert!(!hummock_manager - .start_full_gc( - Duration::from_secs(hummock_manager.env.opts.min_sst_retention_time_sec - 1,), - None - ) - .await - .unwrap()); - - let mut receiver = compactor_manager.add_compactor(worker_id as _); - - assert!(hummock_manager - .start_full_gc( - Duration::from_secs(hummock_manager.env.opts.min_sst_retention_time_sec - 1), - None - ) - .await - .unwrap()); - let _full_scan_task = match receiver.recv().await.unwrap().unwrap().event.unwrap() { - ResponseEvent::FullScanTask(task) => task, - _ => { - panic!() - } - }; - - assert!(hummock_manager + hummock_manager .start_full_gc( Duration::from_secs(hummock_manager.env.opts.min_sst_retention_time_sec + 1), - None + None, + None, ) .await - .unwrap()); - let _full_scan_task = match receiver.recv().await.unwrap().unwrap().event.unwrap() { - ResponseEvent::FullScanTask(task) => task, - _ => { - panic!() - } - }; + .unwrap(); // Empty input results immediate return, without waiting heartbeat. hummock_manager - .complete_full_gc(vec![], None, HashSet::default()) + .complete_gc_batch(vec![], None) .await .unwrap(); @@ -589,10 +589,9 @@ mod tests { assert_eq!( 3, hummock_manager - .complete_full_gc( + .complete_gc_batch( vec![i64::MAX as u64 - 2, i64::MAX as u64 - 1, i64::MAX as u64], None, - HashSet::new(), ) .await .unwrap() @@ -616,44 +615,12 @@ mod tests { assert_eq!( 1, hummock_manager - .complete_full_gc( + .complete_gc_batch( [committed_object_ids, vec![max_committed_object_id + 1]].concat(), None, - HashSet::default(), ) .await .unwrap() ); } - - #[test] - fn test_paged_metric() { - let mut metric = PagedMetric::new(); - fn assert_empty_state(metric: &mut PagedMetric) { - assert_eq!(metric.running_value, 0); - assert!(metric.expect_start_key.is_none()); - } - assert!(metric.sealed_value.is_none()); - assert_empty_state(&mut metric); - - metric.update(None, None, 100); - assert_eq!(metric.take().unwrap(), 100); - assert!(metric.take().is_none()); - assert_empty_state(&mut metric); - - // "start" is not a legal identifier for the first page - metric.update(Some("start".into()), Some("end".into()), 100); - assert!(metric.take().is_none()); - assert_empty_state(&mut metric); - - metric.update(None, Some("middle".into()), 100); - assert!(metric.take().is_none()); - assert_eq!(metric.running_value, 100); - assert_eq!(metric.expect_start_key, Some("middle".into())); - - metric.update(Some("middle".into()), None, 50); - assert_eq!(metric.take().unwrap(), 150); - assert!(metric.take().is_none()); - assert_empty_state(&mut metric); - } } diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 68479d64727a..2d16fc31c104 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -42,7 +42,7 @@ use crate::hummock::compaction::CompactStatus; use crate::hummock::error::Result; use crate::hummock::manager::checkpoint::HummockVersionCheckpoint; use crate::hummock::manager::context::ContextInfo; -use crate::hummock::manager::gc::{DeleteObjectTracker, FullGcState, PagedMetrics}; +use crate::hummock::manager::gc::{FullGcState, GcManager}; use crate::hummock::CompactorManagerRef; use crate::manager::{MetaSrvEnv, MetadataManager}; use crate::model::{ClusterId, MetadataModelError}; @@ -91,9 +91,6 @@ pub struct HummockManager { pub compactor_manager: CompactorManagerRef, event_sender: HummockManagerEventSender, - - delete_object_tracker: DeleteObjectTracker, - object_store: ObjectStoreRef, version_checkpoint_path: String, version_archive_dir: String, @@ -108,12 +105,10 @@ pub struct HummockManager { // `compaction_state` will record the types of compact tasks that can be triggered in `hummock` // and suggest types with a certain priority. pub compaction_state: CompactionState, - full_gc_state: FullGcState, - /// Gather metrics that require accumulation across multiple operations. - /// For example, to get the total number of objects in object store, multiple LISTs are required because a single LIST can visit at most `full_gc_object_limit` objects. - paged_metrics: parking_lot::Mutex, + full_gc_state: Arc, now: Mutex, inflight_time_travel_query: Semaphore, + gc_manager: GcManager, } pub type HummockManagerRef = Arc; @@ -202,6 +197,7 @@ impl HummockManager { let sys_params = env.system_params_reader().await; let state_store_url = sys_params.state_store(); let state_store_dir: &str = sys_params.data_directory(); + let use_new_object_prefix_strategy: bool = sys_params.use_new_object_prefix_strategy(); let deterministic_mode = env.opts.compaction_deterministic_test; let mut object_store_config = env.opts.object_store_config.clone(); // For fs and hdfs object store, operations are not always atomic. @@ -243,8 +239,12 @@ impl HummockManager { let version_checkpoint_path = version_checkpoint_path(state_store_dir); let version_archive_dir = version_archive_dir(state_store_dir); let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); - let full_gc_object_limit = env.opts.full_gc_object_limit; let inflight_time_travel_query = env.opts.max_inflight_time_travel_query; + let gc_manager = GcManager::new( + object_store.clone(), + state_store_dir, + use_new_object_prefix_strategy, + ); let instance = HummockManager { env, versioning: MonitoredRwLock::new( @@ -272,7 +272,6 @@ impl HummockManager { // compaction_request_channel: parking_lot::RwLock::new(None), compactor_manager, event_sender: tx, - delete_object_tracker: Default::default(), object_store, version_checkpoint_path, version_archive_dir, @@ -280,10 +279,10 @@ impl HummockManager { history_table_throughput: parking_lot::RwLock::new(HashMap::default()), compactor_streams_change_tx, compaction_state: CompactionState::new(), - full_gc_state: FullGcState::new(Some(full_gc_object_limit)), - paged_metrics: parking_lot::Mutex::new(PagedMetrics::new()), + full_gc_state: FullGcState::new().into(), now: Mutex::new(0), inflight_time_travel_query: Semaphore::new(inflight_time_travel_query as usize), + gc_manager, }; let instance = Arc::new(instance); instance.init_time_travel_state().await?; @@ -409,7 +408,6 @@ impl HummockManager { .map(|m| (m.context_id as HummockContextId, m.into())) .collect(); - self.delete_object_tracker.clear(); self.initial_compaction_group_config_after_load( versioning_guard, self.compaction_group_manager.write().await.deref_mut(), diff --git a/src/meta/src/hummock/manager/tests.rs b/src/meta/src/hummock/manager/tests.rs index d78f0bc16340..428bf48343f5 100644 --- a/src/meta/src/hummock/manager/tests.rs +++ b/src/meta/src/hummock/manager/tests.rs @@ -567,8 +567,6 @@ async fn test_hummock_manager_basic() { init_version_id + commit_log_count + register_log_count, ); } - // objects_to_delete is always empty because no compaction is ever invoked. - assert!(hummock_manager.get_objects_to_delete().is_empty()); assert_eq!( hummock_manager .delete_version_deltas(usize::MAX) @@ -580,7 +578,6 @@ async fn test_hummock_manager_basic() { hummock_manager.create_version_checkpoint(1).await.unwrap(), commit_log_count + register_log_count ); - assert!(hummock_manager.get_objects_to_delete().is_empty()); assert_eq!( hummock_manager .delete_version_deltas(usize::MAX) @@ -953,7 +950,8 @@ async fn test_hummock_compaction_task_heartbeat() { let compactor_manager = hummock_manager.compactor_manager_ref_for_test(); let _tx = compactor_manager.add_compactor(context_id); - let (join_handle, shutdown_tx) = HummockManager::hummock_timer_task(hummock_manager.clone()); + let (join_handle, shutdown_tx) = + HummockManager::hummock_timer_task(hummock_manager.clone(), None); // No compaction task available. assert!(hummock_manager @@ -1082,7 +1080,8 @@ async fn test_hummock_compaction_task_heartbeat_removal_on_node_removal() { let compactor_manager = hummock_manager.compactor_manager_ref_for_test(); let _tx = compactor_manager.add_compactor(context_id); - let (join_handle, shutdown_tx) = HummockManager::hummock_timer_task(hummock_manager.clone()); + let (join_handle, shutdown_tx) = + HummockManager::hummock_timer_task(hummock_manager.clone(), None); // No compaction task available. assert!(hummock_manager @@ -1181,24 +1180,20 @@ async fn test_extend_objects_to_delete() { }) .max() .unwrap(); - let orphan_sst_num = 10; + let orphan_sst_num: usize = 10; let all_object_ids = sst_infos .iter() .flatten() .map(|s| s.object_id) - .chain(max_committed_object_id + 1..=max_committed_object_id + orphan_sst_num) + .chain(max_committed_object_id + 1..=max_committed_object_id + orphan_sst_num as u64) .collect_vec(); - assert!(hummock_manager.get_objects_to_delete().is_empty()); assert_eq!( hummock_manager - .extend_objects_to_delete_from_scan(&all_object_ids) + .finalize_objects_to_delete(all_object_ids.clone().into_iter()) .await - .unwrap(), - orphan_sst_num as usize - ); - assert_eq!( - hummock_manager.get_objects_to_delete().len(), - orphan_sst_num as usize + .unwrap() + .len(), + orphan_sst_num ); // Checkpoint @@ -1206,50 +1201,30 @@ async fn test_extend_objects_to_delete() { hummock_manager.create_version_checkpoint(1).await.unwrap(), 6 ); - assert_eq!( - hummock_manager.get_objects_to_delete().len(), - orphan_sst_num as usize - ); // since version1 is still pinned, the sst removed in compaction can not be reclaimed. assert_eq!( hummock_manager - .extend_objects_to_delete_from_scan(&all_object_ids) + .finalize_objects_to_delete(all_object_ids.clone().into_iter()) .await - .unwrap(), - orphan_sst_num as usize + .unwrap() + .len(), + orphan_sst_num ); - let objects_to_delete = hummock_manager.get_objects_to_delete(); - assert_eq!(objects_to_delete.len(), orphan_sst_num as usize); let pinned_version2: HummockVersion = hummock_manager.pin_version(context_id).await.unwrap(); - let objects_to_delete = hummock_manager.get_objects_to_delete(); - assert_eq!( - objects_to_delete.len(), - orphan_sst_num as usize, - "{:?}", - objects_to_delete - ); hummock_manager .unpin_version_before(context_id, pinned_version2.id) .await .unwrap(); - let objects_to_delete = hummock_manager.get_objects_to_delete(); - assert_eq!( - objects_to_delete.len(), - orphan_sst_num as usize, - "{:?}", - objects_to_delete - ); // version1 is unpin, but version2 is pinned, and version2 is the checkpoint version. // stale objects are combined in the checkpoint of version2, so no sst to reclaim assert_eq!( hummock_manager - .extend_objects_to_delete_from_scan(&all_object_ids) + .finalize_objects_to_delete(all_object_ids.clone().into_iter()) .await - .unwrap(), - orphan_sst_num as usize + .unwrap() + .len(), + orphan_sst_num ); - let objects_to_delete = hummock_manager.get_objects_to_delete(); - assert_eq!(objects_to_delete.len(), orphan_sst_num as usize); let new_epoch = version_max_committed_epoch(&pinned_version2).next_epoch(); hummock_meta_client .commit_epoch( @@ -1272,13 +1247,12 @@ async fn test_extend_objects_to_delete() { // in the stale objects of version2 checkpoint assert_eq!( hummock_manager - .extend_objects_to_delete_from_scan(&all_object_ids) + .finalize_objects_to_delete(all_object_ids.clone().into_iter()) .await - .unwrap(), - orphan_sst_num as usize + 3 + .unwrap() + .len(), + orphan_sst_num + 3 ); - let objects_to_delete = hummock_manager.get_objects_to_delete(); - assert_eq!(objects_to_delete.len(), orphan_sst_num as usize + 3); } #[tokio::test] @@ -2573,3 +2547,43 @@ async fn test_merge_compaction_group_task_expired() { // task 2 report failed due to the compaction group is merged assert!(!sst_ids.contains(&report_sst_id)); } + +#[tokio::test] +async fn test_vacuum() { + let (_env, hummock_manager, _cluster_manager, worker_id) = setup_compute_env(80).await; + let context_id = worker_id as _; + let hummock_meta_client: Arc = Arc::new(MockHummockMetaClient::new( + hummock_manager.clone(), + context_id, + )); + assert_eq!(hummock_manager.delete_metadata().await.unwrap(), 0); + hummock_manager.pin_version(context_id).await.unwrap(); + let compaction_group_id = StaticCompactionGroupId::StateDefault.into(); + let sst_infos = add_test_tables( + hummock_manager.as_ref(), + hummock_meta_client.clone(), + compaction_group_id, + ) + .await; + assert_eq!(hummock_manager.delete_metadata().await.unwrap(), 0); + hummock_manager.create_version_checkpoint(1).await.unwrap(); + assert_eq!(hummock_manager.delete_metadata().await.unwrap(), 6); + assert_eq!(hummock_manager.delete_metadata().await.unwrap(), 0); + + hummock_manager + .unpin_version_before(context_id, HummockVersionId::MAX) + .await + .unwrap(); + hummock_manager.create_version_checkpoint(0).await.unwrap(); + let deleted = hummock_manager + .complete_gc_batch( + sst_infos + .iter() + .flat_map(|ssts| ssts.iter().map(|s| s.object_id)) + .collect(), + None, + ) + .await + .unwrap(); + assert_eq!(deleted, 3); +} diff --git a/src/meta/src/hummock/manager/timer_task.rs b/src/meta/src/hummock/manager/timer_task.rs index 5b62ba883e7d..7434dd1f40f9 100644 --- a/src/meta/src/hummock/manager/timer_task.rs +++ b/src/meta/src/hummock/manager/timer_task.rs @@ -31,11 +31,15 @@ use tokio::task::JoinHandle; use tokio_stream::wrappers::IntervalStream; use tracing::warn; +use crate::backup_restore::BackupManagerRef; use crate::hummock::metrics_utils::{trigger_lsm_stat, trigger_mv_stat}; use crate::hummock::{HummockManager, TASK_NORMAL}; impl HummockManager { - pub fn hummock_timer_task(hummock_manager: Arc) -> (JoinHandle<()>, Sender<()>) { + pub fn hummock_timer_task( + hummock_manager: Arc, + backup_manager: Option, + ) -> (JoinHandle<()>, Sender<()>) { let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel(); let join_handle = tokio::spawn(async move { const CHECK_PENDING_TASK_PERIOD_SEC: u64 = 300; @@ -433,13 +437,21 @@ impl HummockManager { HummockTimerEvent::FullGc => { let retention_sec = hummock_manager.env.opts.min_sst_retention_time_sec; - if hummock_manager - .start_full_gc(Duration::from_secs(retention_sec), None) - .await - .is_ok() - { - tracing::info!("Start full GC from meta node."); - } + let backup_manager_2 = backup_manager.clone(); + let hummock_manager_2 = hummock_manager.clone(); + tokio::task::spawn(async move { + use thiserror_ext::AsReport; + let _ = hummock_manager_2 + .start_full_gc( + Duration::from_secs(retention_sec), + None, + backup_manager_2, + ) + .await + .inspect_err(|e| { + warn!(error = %e.as_report(), "Failed to start GC.") + }); + }); } } } diff --git a/src/meta/src/hummock/manager/utils.rs b/src/meta/src/hummock/manager/utils.rs index 1a8eb2d5d27e..577e54996186 100644 --- a/src/meta/src/hummock/manager/utils.rs +++ b/src/meta/src/hummock/manager/utils.rs @@ -70,7 +70,6 @@ impl HummockManager { let mut compaction_guard = self.compaction.write().await; let mut versioning_guard = self.versioning.write().await; let mut context_info_guard = self.context_info.write().await; - let objects_to_delete = self.delete_object_tracker.current(); // We don't check `checkpoint` because it's allowed to update its in memory state without // persisting to object store. let get_state = |compaction_guard: &mut Compaction, @@ -110,9 +109,6 @@ impl HummockManager { mem_state, loaded_state, "hummock in-mem state is inconsistent with meta store state", ); - self.delete_object_tracker.clear(); - self.delete_object_tracker - .add(objects_to_delete.into_iter()); } pub async fn get_new_sst_ids(&self, number: u32) -> Result { diff --git a/src/meta/src/hummock/mock_hummock_meta_client.rs b/src/meta/src/hummock/mock_hummock_meta_client.rs index 75bc3f121f43..777509d53ed8 100644 --- a/src/meta/src/hummock/mock_hummock_meta_client.rs +++ b/src/meta/src/hummock/mock_hummock_meta_client.rs @@ -30,8 +30,8 @@ use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; use risingwave_hummock_sdk::sstable_info::SstableInfo; use risingwave_hummock_sdk::version::HummockVersion; use risingwave_hummock_sdk::{ - HummockContextId, HummockEpoch, HummockSstableObjectId, HummockVersionId, LocalSstableInfo, - SstObjectIdRange, SyncResult, + HummockContextId, HummockEpoch, HummockVersionId, LocalSstableInfo, SstObjectIdRange, + SyncResult, }; use risingwave_pb::common::{HostAddress, WorkerType}; use risingwave_pb::hummock::compact_task::TaskStatus; @@ -39,7 +39,7 @@ use risingwave_pb::hummock::subscribe_compaction_event_request::{Event, ReportTa use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent; use risingwave_pb::hummock::{ compact_task, PbHummockVersion, SubscribeCompactionEventRequest, - SubscribeCompactionEventResponse, VacuumTask, + SubscribeCompactionEventResponse, }; use risingwave_rpc_client::error::{Result, RpcError}; use risingwave_rpc_client::{CompactionEventItem, HummockMetaClient}; @@ -227,10 +227,6 @@ impl HummockMetaClient for MockHummockMetaClient { Ok(()) } - async fn report_vacuum_task(&self, _vacuum_task: VacuumTask) -> Result<()> { - Ok(()) - } - async fn trigger_manual_compaction( &self, _compaction_group_id: u64, @@ -241,17 +237,6 @@ impl HummockMetaClient for MockHummockMetaClient { todo!() } - async fn report_full_scan_task( - &self, - _filtered_object_ids: Vec, - _total_object_count: u64, - _total_object_size: u64, - _start_after: Option, - _next_start_after: Option, - ) -> Result<()> { - unimplemented!() - } - async fn trigger_full_gc( &self, _sst_retention_time_sec: u64, diff --git a/src/meta/src/hummock/mod.rs b/src/meta/src/hummock/mod.rs index 4150f51280d3..7b8af79ae960 100644 --- a/src/meta/src/hummock/mod.rs +++ b/src/meta/src/hummock/mod.rs @@ -25,8 +25,6 @@ pub mod mock_hummock_meta_client; pub mod model; pub mod test_utils; mod utils; -mod vacuum; - use std::time::Duration; pub use compactor_manager::*; @@ -34,30 +32,24 @@ pub use compactor_manager::*; pub use mock_hummock_meta_client::MockHummockMetaClient; use tokio::sync::oneshot::Sender; use tokio::task::JoinHandle; -pub use vacuum::*; use crate::MetaOpts; /// Start hummock's asynchronous tasks. pub fn start_hummock_workers( hummock_manager: HummockManagerRef, - vacuum_manager: VacuumManagerRef, meta_opts: &MetaOpts, ) -> Vec<(JoinHandle<()>, Sender<()>)> { // These critical tasks are put in their own timer loop deliberately, to avoid long-running ones // from blocking others. let workers = vec![ start_checkpoint_loop( - hummock_manager, + hummock_manager.clone(), Duration::from_secs(meta_opts.hummock_version_checkpoint_interval_sec), meta_opts.min_delta_log_num_for_hummock_version_checkpoint, ), start_vacuum_metadata_loop( - vacuum_manager.clone(), - Duration::from_secs(meta_opts.vacuum_interval_sec), - ), - start_vacuum_object_loop( - vacuum_manager, + hummock_manager.clone(), Duration::from_secs(meta_opts.vacuum_interval_sec), ), ]; @@ -66,7 +58,7 @@ pub fn start_hummock_workers( /// Starts a task to periodically vacuum stale metadata. pub fn start_vacuum_metadata_loop( - vacuum: VacuumManagerRef, + hummock_manager: HummockManagerRef, interval: Duration, ) -> (JoinHandle<()>, Sender<()>) { let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel(); @@ -83,7 +75,7 @@ pub fn start_vacuum_metadata_loop( return; } } - if let Err(err) = vacuum.vacuum_metadata().await { + if let Err(err) = hummock_manager.delete_metadata().await { tracing::warn!(error = %err.as_report(), "Vacuum metadata error"); } } @@ -91,33 +83,6 @@ pub fn start_vacuum_metadata_loop( (join_handle, shutdown_tx) } -/// Starts a task to periodically vacuum stale objects. -pub fn start_vacuum_object_loop( - vacuum: VacuumManagerRef, - interval: Duration, -) -> (JoinHandle<()>, Sender<()>) { - let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel(); - let join_handle = tokio::spawn(async move { - let mut min_trigger_interval = tokio::time::interval(interval); - min_trigger_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); - loop { - tokio::select! { - // Wait for interval - _ = min_trigger_interval.tick() => {}, - // Shutdown vacuum - _ = &mut shutdown_rx => { - tracing::info!("Vacuum object loop is stopped"); - return; - } - } - if let Err(err) = vacuum.vacuum_object().await { - tracing::warn!(error = %err.as_report(), "Vacuum object error"); - } - } - }); - (join_handle, shutdown_tx) -} - pub fn start_checkpoint_loop( hummock_manager: HummockManagerRef, interval: Duration, diff --git a/src/meta/src/hummock/vacuum.rs b/src/meta/src/hummock/vacuum.rs deleted file mode 100644 index bc04aa53b6a8..000000000000 --- a/src/meta/src/hummock/vacuum.rs +++ /dev/null @@ -1,302 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashSet; -use std::sync::Arc; -use std::time::Duration; - -use itertools::Itertools; -use risingwave_common::system_param::reader::SystemParamsRead; -use risingwave_common::util::epoch::Epoch; -use risingwave_hummock_sdk::HummockSstableObjectId; -use risingwave_pb::hummock::subscribe_compaction_event_response::Event as ResponseEvent; -use risingwave_pb::hummock::VacuumTask; -use thiserror_ext::AsReport; - -use super::CompactorManagerRef; -use crate::backup_restore::BackupManagerRef; -use crate::hummock::HummockManagerRef; -use crate::manager::MetaSrvEnv; -use crate::MetaResult; - -pub type VacuumManagerRef = Arc; - -pub struct VacuumManager { - env: MetaSrvEnv, - hummock_manager: HummockManagerRef, - pub backup_manager: BackupManagerRef, - /// Use the `CompactorManager` to dispatch `VacuumTask`. - compactor_manager: CompactorManagerRef, - /// SST object ids which have been dispatched to vacuum nodes but are not replied yet. - pending_object_ids: parking_lot::RwLock>, -} - -impl VacuumManager { - pub fn new( - env: MetaSrvEnv, - hummock_manager: HummockManagerRef, - backup_manager: BackupManagerRef, - compactor_manager: CompactorManagerRef, - ) -> Self { - Self { - env, - hummock_manager, - backup_manager, - compactor_manager, - pending_object_ids: Default::default(), - } - } - - /// Tries to delete stale Hummock metadata. - /// - /// Returns number of deleted deltas - pub async fn vacuum_metadata(&self) -> MetaResult { - let batch_size = 64usize; - let mut total_deleted = 0; - loop { - if total_deleted != 0 && self.env.opts.vacuum_spin_interval_ms != 0 { - tokio::time::sleep(Duration::from_millis(self.env.opts.vacuum_spin_interval_ms)) - .await; - } - let (deleted, remain) = self - .hummock_manager - .delete_version_deltas(batch_size) - .await?; - total_deleted += deleted; - if total_deleted == 0 || remain < batch_size { - break; - } - } - - let current_epoch_time = Epoch::now().physical_time(); - let epoch_watermark = Epoch::from_physical_time( - current_epoch_time.saturating_sub( - self.env - .system_params_reader() - .await - .time_travel_retention_ms(), - ), - ) - .0; - self.hummock_manager - .truncate_time_travel_metadata(epoch_watermark) - .await?; - - Ok(total_deleted) - } - - /// Schedules deletion of SST objects from object store. - /// - /// Returns SST objects scheduled in worker node. - pub async fn vacuum_object(&self) -> MetaResult> { - // Select SST objects to delete. - let objects_to_delete = { - // 1. Retry the pending SST objects first. - // It is possible some vacuum workers have been asked to vacuum these SST objects - // previously, but they don't report the results yet due to either latency - // or failure. This is OK since trying to delete the same SST object - // multiple times is safe. - let pending_object_ids = self.pending_object_ids.read().iter().cloned().collect_vec(); - if !pending_object_ids.is_empty() { - pending_object_ids - } else { - // 2. If no pending SST objects, then fetch new ones. - let objects_to_delete = self.hummock_manager.get_objects_to_delete(); - if objects_to_delete.is_empty() { - return Ok(vec![]); - } - // Track these SST object ids, so that we can remove them from metadata later. - self.pending_object_ids - .write() - .extend(objects_to_delete.clone()); - objects_to_delete - } - }; - - // Dispatch the vacuum task - let mut batch_idx = 0; - let batch_size = 500usize; - let mut sent_batch = Vec::with_capacity(objects_to_delete.len()); - while batch_idx < objects_to_delete.len() { - if batch_idx != 0 && self.env.opts.vacuum_spin_interval_ms != 0 { - tokio::time::sleep(Duration::from_millis(self.env.opts.vacuum_spin_interval_ms)) - .await; - } - let delete_batch = objects_to_delete - .iter() - .skip(batch_idx) - .take(batch_size) - .cloned() - .collect_vec(); - // 1. Pick a worker. - let compactor = match self.compactor_manager.next_compactor() { - None => { - tracing::warn!("No vacuum worker is available."); - break; - } - Some(compactor) => compactor, - }; - - // 2. Send task. - match compactor.send_event(ResponseEvent::VacuumTask(VacuumTask { - // The SST id doesn't necessarily have a counterpart SST file in S3, but - // it's OK trying to delete it. - sstable_object_ids: delete_batch.clone(), - })) { - Ok(_) => { - tracing::debug!( - "Try to vacuum SSTs {:?} in worker {}.", - delete_batch, - compactor.context_id() - ); - batch_idx += batch_size; - sent_batch.extend(delete_batch); - } - Err(err) => { - tracing::warn!( - error = %err.as_report(), - "Failed to send vacuum task to worker {}", - compactor.context_id(), - ); - self.compactor_manager - .remove_compactor(compactor.context_id()); - } - } - } - Ok(sent_batch) - } - - /// Acknowledges deletion of SSTs and deletes corresponding metadata. - pub async fn report_vacuum_task(&self, vacuum_task: VacuumTask) -> MetaResult<()> { - let deleted_object_ids = self - .pending_object_ids - .read() - .iter() - .filter(|p| vacuum_task.sstable_object_ids.contains(p)) - .cloned() - .collect_vec(); - if !deleted_object_ids.is_empty() { - self.hummock_manager - .ack_deleted_objects(&deleted_object_ids) - .await?; - self.pending_object_ids - .write() - .retain(|p| !deleted_object_ids.contains(p)); - } - tracing::info!("Finish vacuuming SSTs {:?}", vacuum_task.sstable_object_ids); - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use std::collections::HashSet; - use std::sync::Arc; - - use itertools::Itertools; - use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId; - use risingwave_hummock_sdk::HummockVersionId; - use risingwave_pb::hummock::VacuumTask; - use risingwave_rpc_client::HummockMetaClient; - - use crate::backup_restore::BackupManager; - use crate::hummock::test_utils::{add_test_tables, setup_compute_env}; - use crate::hummock::{MockHummockMetaClient, VacuumManager}; - - #[tokio::test] - async fn test_vacuum() { - let (env, hummock_manager, _cluster_manager, worker_id) = setup_compute_env(80).await; - let context_id = worker_id as _; - let hummock_meta_client: Arc = Arc::new(MockHummockMetaClient::new( - hummock_manager.clone(), - context_id, - )); - let compactor_manager = hummock_manager.compactor_manager_ref_for_test(); - let backup_manager = - Arc::new(BackupManager::for_test(env.clone(), hummock_manager.clone()).await); - let vacuum = Arc::new(VacuumManager::new( - env, - hummock_manager.clone(), - backup_manager, - compactor_manager.clone(), - )); - assert_eq!(vacuum.vacuum_metadata().await.unwrap(), 0); - assert_eq!(vacuum.vacuum_object().await.unwrap().len(), 0); - hummock_manager.pin_version(context_id).await.unwrap(); - let compaction_group_id = StaticCompactionGroupId::StateDefault.into(); - let sst_infos = add_test_tables( - hummock_manager.as_ref(), - hummock_meta_client.clone(), - compaction_group_id, - ) - .await; - assert_eq!(vacuum.vacuum_metadata().await.unwrap(), 0); - hummock_manager.create_version_checkpoint(1).await.unwrap(); - assert_eq!(vacuum.vacuum_metadata().await.unwrap(), 6); - assert_eq!(vacuum.vacuum_metadata().await.unwrap(), 0); - - assert!(hummock_manager.get_objects_to_delete().is_empty()); - hummock_manager - .unpin_version_before(context_id, HummockVersionId::MAX) - .await - .unwrap(); - hummock_manager.create_version_checkpoint(0).await.unwrap(); - assert!(hummock_manager.get_objects_to_delete().is_empty()); - hummock_manager - .complete_full_gc( - sst_infos - .iter() - .flat_map(|ssts| ssts.iter().map(|s| s.object_id)) - .collect(), - None, - HashSet::default(), - ) - .await - .unwrap(); - assert_eq!(hummock_manager.get_objects_to_delete().len(), 3); - assert_eq!( - hummock_manager - .get_objects_to_delete() - .into_iter() - .sorted() - .collect::>(), - sst_infos[0] - .iter() - .map(|s| s.object_id) - .sorted() - .collect::>() - ); - // No SST deletion is scheduled because no available worker. - assert_eq!(vacuum.vacuum_object().await.unwrap().len(), 0); - let _receiver = compactor_manager.add_compactor(context_id); - // SST deletion is scheduled. - assert_eq!(vacuum.vacuum_object().await.unwrap().len(), 3); - // The deletion is not acked yet. - assert_eq!(vacuum.vacuum_object().await.unwrap().len(), 3); - // The vacuum task is reported. - vacuum - .report_vacuum_task(VacuumTask { - sstable_object_ids: sst_infos - .first() - .unwrap() - .iter() - .map(|s| s.object_id) - .collect_vec(), - }) - .await - .unwrap(); - // No objects_to_delete. - assert_eq!(vacuum.vacuum_object().await.unwrap().len(), 0); - } -} diff --git a/src/rpc_client/src/compactor_client.rs b/src/rpc_client/src/compactor_client.rs index 7daae12d2755..7f8618ed4ba2 100644 --- a/src/rpc_client/src/compactor_client.rs +++ b/src/rpc_client/src/compactor_client.rs @@ -20,8 +20,7 @@ use risingwave_common::util::addr::HostAddr; use risingwave_pb::hummock::hummock_manager_service_client::HummockManagerServiceClient; use risingwave_pb::hummock::{ GetNewSstIdsRequest, GetNewSstIdsResponse, ReportCompactionTaskRequest, - ReportCompactionTaskResponse, ReportFullScanTaskRequest, ReportFullScanTaskResponse, - ReportVacuumTaskRequest, ReportVacuumTaskResponse, + ReportCompactionTaskResponse, }; use risingwave_pb::meta::system_params_service_client::SystemParamsServiceClient; use risingwave_pb::meta::{GetSystemParamsRequest, GetSystemParamsResponse}; @@ -138,25 +137,6 @@ impl GrpcCompactorProxyClient { ) } - pub async fn report_full_scan_task( - &self, - request: ReportFullScanTaskRequest, - ) -> std::result::Result, tonic::Status> { - retry_rpc!( - self, - report_full_scan_task, - request, - ReportFullScanTaskResponse - ) - } - - pub async fn report_vacuum_task( - &self, - request: ReportVacuumTaskRequest, - ) -> std::result::Result, tonic::Status> { - retry_rpc!(self, report_vacuum_task, request, ReportVacuumTaskResponse) - } - pub async fn get_system_params( &self, ) -> std::result::Result, tonic::Status> { diff --git a/src/rpc_client/src/hummock_meta_client.rs b/src/rpc_client/src/hummock_meta_client.rs index b39ff31961aa..112be025db94 100644 --- a/src/rpc_client/src/hummock_meta_client.rs +++ b/src/rpc_client/src/hummock_meta_client.rs @@ -15,11 +15,9 @@ use async_trait::async_trait; use futures::stream::BoxStream; use risingwave_hummock_sdk::version::HummockVersion; -use risingwave_hummock_sdk::{ - HummockEpoch, HummockSstableObjectId, HummockVersionId, SstObjectIdRange, SyncResult, -}; +use risingwave_hummock_sdk::{HummockEpoch, HummockVersionId, SstObjectIdRange, SyncResult}; use risingwave_pb::hummock::{ - PbHummockVersion, SubscribeCompactionEventRequest, SubscribeCompactionEventResponse, VacuumTask, + PbHummockVersion, SubscribeCompactionEventRequest, SubscribeCompactionEventResponse, }; use tokio::sync::mpsc::UnboundedSender; @@ -39,7 +37,6 @@ pub trait HummockMetaClient: Send + Sync + 'static { sync_result: SyncResult, is_log_store: bool, ) -> Result<()>; - async fn report_vacuum_task(&self, vacuum_task: VacuumTask) -> Result<()>; async fn trigger_manual_compaction( &self, compaction_group_id: u64, @@ -47,14 +44,6 @@ pub trait HummockMetaClient: Send + Sync + 'static { level: u32, sst_ids: Vec, ) -> Result<()>; - async fn report_full_scan_task( - &self, - filtered_object_ids: Vec, - total_object_count: u64, - total_object_size: u64, - start_after: Option, - next_start_after: Option, - ) -> Result<()>; async fn trigger_full_gc( &self, sst_retention_time_sec: u64, diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 24375d395a06..be733e8d4ec1 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -43,8 +43,7 @@ use risingwave_error::tonic::ErrorIsFromTonicServerImpl; use risingwave_hummock_sdk::compaction_group::StateTableId; use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionDelta}; use risingwave_hummock_sdk::{ - CompactionGroupId, HummockEpoch, HummockSstableObjectId, HummockVersionId, SstObjectIdRange, - SyncResult, + CompactionGroupId, HummockEpoch, HummockVersionId, SstObjectIdRange, SyncResult, }; use risingwave_pb::backup_service::backup_service_client::BackupServiceClient; use risingwave_pb::backup_service::*; @@ -1537,33 +1536,6 @@ impl HummockMetaClient for MetaClient { panic!("Only meta service can commit_epoch in production.") } - async fn report_vacuum_task(&self, vacuum_task: VacuumTask) -> Result<()> { - let req = ReportVacuumTaskRequest { - vacuum_task: Some(vacuum_task), - }; - self.inner.report_vacuum_task(req).await?; - Ok(()) - } - - async fn report_full_scan_task( - &self, - filtered_object_ids: Vec, - total_object_count: u64, - total_object_size: u64, - start_after: Option, - next_start_after: Option, - ) -> Result<()> { - let req = ReportFullScanTaskRequest { - object_ids: filtered_object_ids, - total_object_count, - total_object_size, - next_start_after, - start_after, - }; - self.inner.report_full_scan_task(req).await?; - Ok(()) - } - async fn trigger_manual_compaction( &self, compaction_group_id: u64, @@ -2118,9 +2090,7 @@ macro_rules! for_all_meta_rpc { ,{ hummock_client, trigger_compaction_deterministic, TriggerCompactionDeterministicRequest, TriggerCompactionDeterministicResponse } ,{ hummock_client, disable_commit_epoch, DisableCommitEpochRequest, DisableCommitEpochResponse } ,{ hummock_client, get_new_sst_ids, GetNewSstIdsRequest, GetNewSstIdsResponse } - ,{ hummock_client, report_vacuum_task, ReportVacuumTaskRequest, ReportVacuumTaskResponse } ,{ hummock_client, trigger_manual_compaction, TriggerManualCompactionRequest, TriggerManualCompactionResponse } - ,{ hummock_client, report_full_scan_task, ReportFullScanTaskRequest, ReportFullScanTaskResponse } ,{ hummock_client, trigger_full_gc, TriggerFullGcRequest, TriggerFullGcResponse } ,{ hummock_client, rise_ctl_get_pinned_versions_summary, RiseCtlGetPinnedVersionsSummaryRequest, RiseCtlGetPinnedVersionsSummaryResponse } ,{ hummock_client, rise_ctl_list_compaction_group, RiseCtlListCompactionGroupRequest, RiseCtlListCompactionGroupResponse } diff --git a/src/storage/hummock_sdk/src/lib.rs b/src/storage/hummock_sdk/src/lib.rs index 366fc02310eb..e66484a5b69e 100644 --- a/src/storage/hummock_sdk/src/lib.rs +++ b/src/storage/hummock_sdk/src/lib.rs @@ -406,6 +406,38 @@ impl EpochWithGap { } } +pub fn get_sst_data_path( + obj_prefix: &str, + path_prefix: &str, + object_id: HummockSstableObjectId, +) -> String { + let mut path = String::with_capacity( + path_prefix.len() + + "/".len() + + obj_prefix.len() + + HUMMOCK_SSTABLE_OBJECT_ID_MAX_DECIMAL_LENGTH + + ".".len() + + OBJECT_SUFFIX.len(), + ); + path.push_str(path_prefix); + path.push('/'); + path.push_str(obj_prefix); + path.push_str(&object_id.to_string()); + path.push('.'); + path.push_str(OBJECT_SUFFIX); + path +} + +pub fn get_object_id_from_path(path: &str) -> HummockSstableObjectId { + use itertools::Itertools; + let split = path.split(&['/', '.']).collect_vec(); + assert!(split.len() > 2); + assert_eq!(split[split.len() - 1], OBJECT_SUFFIX); + split[split.len() - 2] + .parse::() + .expect("valid sst id") +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/storage/hummock_test/src/lib.rs b/src/storage/hummock_test/src/lib.rs index a89ead17944f..7cac9f756d9e 100644 --- a/src/storage/hummock_test/src/lib.rs +++ b/src/storage/hummock_test/src/lib.rs @@ -26,8 +26,6 @@ mod snapshot_tests; mod state_store_tests; #[cfg(any(test, feature = "test"))] pub mod test_utils; -#[cfg(test)] -mod vacuum_tests; #[cfg(test)] mod hummock_read_version_tests; diff --git a/src/storage/hummock_test/src/vacuum_tests.rs b/src/storage/hummock_test/src/vacuum_tests.rs deleted file mode 100644 index 893d2a05dca9..000000000000 --- a/src/storage/hummock_test/src/vacuum_tests.rs +++ /dev/null @@ -1,123 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::iter; -use std::ops::Sub; -use std::sync::Arc; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; - -use futures::stream; -use itertools::Itertools; -use risingwave_meta::hummock::test_utils::setup_compute_env; -use risingwave_meta::hummock::MockHummockMetaClient; -use risingwave_object_store::object::ObjectMetadata; -use risingwave_pb::hummock::{FullScanTask, VacuumTask}; -use risingwave_storage::hummock::iterator::test_utils::mock_sstable_store; -use risingwave_storage::hummock::test_utils::{ - default_builder_opt_for_test, gen_default_test_sstable, -}; -use risingwave_storage::hummock::vacuum::Vacuum; - -#[tokio::test] -async fn test_vacuum() { - let sstable_store = mock_sstable_store().await; - // Put some SSTs to object store - let object_ids = (1..10).collect_vec(); - let mut sstables = vec![]; - for sstable_object_id in &object_ids { - let sstable = gen_default_test_sstable( - default_builder_opt_for_test(), - *sstable_object_id, - sstable_store.clone(), - ) - .await; - sstables.push(sstable); - } - - // Delete all existent SSTs and a nonexistent SSTs. Trying to delete a nonexistent SST is - // OK. - let nonexistent_id = 11u64; - let vacuum_task = VacuumTask { - sstable_object_ids: object_ids - .into_iter() - .chain(iter::once(nonexistent_id)) - .collect_vec(), - }; - let (_env, hummock_manager_ref, _cluster_ctl_ref, worker_id) = setup_compute_env(8080).await; - let mock_hummock_meta_client = Arc::new(MockHummockMetaClient::new( - hummock_manager_ref.clone(), - worker_id as _, - )); - Vacuum::handle_vacuum_task(sstable_store, &vacuum_task.sstable_object_ids) - .await - .unwrap(); - assert!(Vacuum::report_vacuum_task(vacuum_task, mock_hummock_meta_client).await); -} - -#[tokio::test] -async fn test_full_scan() { - let (_env, hummock_manager_ref, _cluster_ctl_ref, worker_id) = setup_compute_env(8080).await; - let sstable_store = mock_sstable_store().await; - let _mock_hummock_meta_client = Arc::new(MockHummockMetaClient::new( - hummock_manager_ref, - worker_id as _, - )); - let now_ts = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); - let object_store_list_result = vec![ - ObjectMetadata { - key: sstable_store.get_sst_data_path(1), - last_modified: now_ts.sub(Duration::from_secs(7200)).as_secs_f64(), - total_size: 128, - }, - ObjectMetadata { - key: sstable_store.get_sst_data_path(2), - last_modified: now_ts.sub(Duration::from_secs(3600)).as_secs_f64(), - total_size: 128, - }, - ]; - let object_metadata_iter = Box::pin(stream::iter(object_store_list_result.into_iter().map(Ok))); - - let task = FullScanTask { - sst_retention_watermark: 0, - prefix: None, - start_after: None, - limit: None, - }; - let (scan_result, ..) = Vacuum::full_scan_inner(task, object_metadata_iter.clone()) - .await - .unwrap(); - assert!(scan_result.is_empty()); - - let task = FullScanTask { - sst_retention_watermark: now_ts.sub(Duration::from_secs(6000)).as_secs(), - prefix: None, - start_after: None, - limit: None, - }; - let (scan_result, ..) = Vacuum::full_scan_inner(task, object_metadata_iter.clone()) - .await - .unwrap(); - assert_eq!(scan_result.into_iter().sorted().collect_vec(), vec![1]); - - let task = FullScanTask { - sst_retention_watermark: u64::MAX, - prefix: None, - start_after: None, - limit: None, - }; - let (scan_result, ..) = Vacuum::full_scan_inner(task, object_metadata_iter) - .await - .unwrap(); - assert_eq!(scan_result.into_iter().sorted().collect_vec(), vec![1, 2]); -} diff --git a/src/storage/src/hummock/compactor/mod.rs b/src/storage/src/hummock/compactor/mod.rs index 772df4050b66..e687b60e0e65 100644 --- a/src/storage/src/hummock/compactor/mod.rs +++ b/src/storage/src/hummock/compactor/mod.rs @@ -21,7 +21,7 @@ use risingwave_pb::hummock::report_compaction_task_request::{ Event as ReportCompactionTaskEvent, HeartBeat as SharedHeartBeat, ReportTask as ReportSharedTask, }; -use risingwave_pb::hummock::{PbCompactTask, ReportFullScanTaskRequest, ReportVacuumTaskRequest}; +use risingwave_pb::hummock::PbCompactTask; use risingwave_rpc_client::GrpcCompactorProxyClient; use thiserror_ext::AsReport; use tokio::sync::mpsc; @@ -86,7 +86,6 @@ use crate::compaction_catalog_manager::{ use crate::hummock::compactor::compaction_utils::calculate_task_parallelism; use crate::hummock::compactor::compactor_runner::{compact_and_build_sst, compact_done}; use crate::hummock::iterator::{Forward, HummockIterator}; -use crate::hummock::vacuum::Vacuum; use crate::hummock::{ validate_ssts, BlockedXor16FilterBuilder, FilterBuilder, SharedComapctorObjectIdManager, SstableWriterFactory, UnifiedSstableWriterFactory, @@ -469,7 +468,6 @@ pub fn start_compactor( .compaction_event_consumed_latency .observe(consumed_latency_ms as _); - let meta_client = hummock_meta_client.clone(); let sstable_object_id_manager = sstable_object_id_manager.clone(); let compaction_catalog_manager_ref = compaction_catalog_manager_ref.clone(); @@ -555,48 +553,11 @@ pub fn start_compactor( } }); } - ResponseEvent::VacuumTask(vacuum_task) => { - executor.spawn(async move { - match Vacuum::handle_vacuum_task( - context.sstable_store.clone(), - &vacuum_task.sstable_object_ids, - ) - .await - { - Ok(_) => { - Vacuum::report_vacuum_task(vacuum_task, meta_client).await; - } - Err(e) => { - tracing::warn!(error = %e.as_report(), "Failed to vacuum task") - } - } - }); + ResponseEvent::VacuumTask(_) => { + unreachable!("unexpected vacuum task"); } - ResponseEvent::FullScanTask(full_scan_task) => { - executor.spawn(async move { - let start_after = full_scan_task.start_after.clone(); - match Vacuum::handle_full_scan_task( - full_scan_task, - context.sstable_store.clone(), - ) - .await - { - Ok((object_ids, total_object_count, total_object_size, next_start_after)) => { - Vacuum::report_full_scan_task( - object_ids, - total_object_count, - total_object_size, - start_after, - next_start_after, - meta_client, - ) - .await; - } - Err(e) => { - tracing::warn!(error = %e.as_report(), "Failed to iter object"); - } - } - }); + ResponseEvent::FullScanTask(_) => { + unreachable!("unexpected scan task"); } ResponseEvent::ValidationTask(validation_task) => { let validation_task = ValidationTask::from(validation_task); @@ -764,52 +725,11 @@ pub fn start_shared_compactor( } } - dispatch_compaction_task_request::Task::VacuumTask(vacuum_task) => { - match Vacuum::handle_vacuum_task( - context.sstable_store.clone(), - &vacuum_task.sstable_object_ids, - ) - .await - { - Ok(_) => { - let report_vacuum_task_request = ReportVacuumTaskRequest { - vacuum_task: Some(vacuum_task), - }; - match cloned_grpc_proxy_client.report_vacuum_task(report_vacuum_task_request).await { - Ok(_) => tracing::info!("Finished vacuuming SSTs"), - Err(e) => tracing::warn!(error = %e.as_report(), "Failed to report vacuum task"), - } - } - Err(e) => { - tracing::warn!(error = %e.as_report(), "Failed to vacuum task") - } - } + dispatch_compaction_task_request::Task::VacuumTask(_) => { + unreachable!("unexpected vacuum task"); } - dispatch_compaction_task_request::Task::FullScanTask(full_scan_task) => { - let start_after = full_scan_task.start_after.clone(); - match Vacuum::handle_full_scan_task(full_scan_task, context.sstable_store.clone()) - .await - { - Ok((object_ids, total_object_count, total_object_size, next_start_after)) => { - let report_full_scan_task_request = ReportFullScanTaskRequest { - object_ids, - total_object_count, - total_object_size, - next_start_after, - start_after, - }; - match cloned_grpc_proxy_client - .report_full_scan_task(report_full_scan_task_request) - .await - { - Ok(_) => tracing::info!("Finished full scan SSTs"), - Err(e) => tracing::warn!(error = %e.as_report(), "Failed to report full scan task"), - } - } - Err(e) => { - tracing::warn!(error = %e.as_report(), "Failed to iter object"); - } - } + dispatch_compaction_task_request::Task::FullScanTask(_) => { + unreachable!("unexpected scan task"); } dispatch_compaction_task_request::Task::ValidationTask(validation_task) => { let validation_task = ValidationTask::from(validation_task); diff --git a/src/storage/src/hummock/hummock_meta_client.rs b/src/storage/src/hummock/hummock_meta_client.rs index c23a037c589e..aa874d7c3fb5 100644 --- a/src/storage/src/hummock/hummock_meta_client.rs +++ b/src/storage/src/hummock/hummock_meta_client.rs @@ -17,8 +17,8 @@ use std::sync::Arc; use async_trait::async_trait; use futures::stream::BoxStream; use risingwave_hummock_sdk::version::HummockVersion; -use risingwave_hummock_sdk::{HummockSstableObjectId, SstObjectIdRange, SyncResult}; -use risingwave_pb::hummock::{PbHummockVersion, SubscribeCompactionEventRequest, VacuumTask}; +use risingwave_hummock_sdk::{SstObjectIdRange, SyncResult}; +use risingwave_pb::hummock::{PbHummockVersion, SubscribeCompactionEventRequest}; use risingwave_rpc_client::error::Result; use risingwave_rpc_client::{CompactionEventItem, HummockMetaClient, MetaClient}; use tokio::sync::mpsc::UnboundedSender; @@ -71,10 +71,6 @@ impl HummockMetaClient for MonitoredHummockMetaClient { panic!("Only meta service can commit_epoch in production.") } - async fn report_vacuum_task(&self, vacuum_task: VacuumTask) -> Result<()> { - self.meta_client.report_vacuum_task(vacuum_task).await - } - async fn trigger_manual_compaction( &self, compaction_group_id: u64, @@ -87,25 +83,6 @@ impl HummockMetaClient for MonitoredHummockMetaClient { .await } - async fn report_full_scan_task( - &self, - filtered_object_ids: Vec, - total_object_count: u64, - total_object_size: u64, - start_after: Option, - next_start_after: Option, - ) -> Result<()> { - self.meta_client - .report_full_scan_task( - filtered_object_ids, - total_object_count, - total_object_size, - start_after, - next_start_after, - ) - .await - } - async fn trigger_full_gc( &self, sst_retention_time_sec: u64, diff --git a/src/storage/src/hummock/mod.rs b/src/storage/src/hummock/mod.rs index bdf81b70c027..56fcc985b28e 100644 --- a/src/storage/src/hummock/mod.rs +++ b/src/storage/src/hummock/mod.rs @@ -44,7 +44,6 @@ pub mod local_version; pub mod observer_manager; pub mod store; pub use store::*; -pub mod vacuum; mod validator; pub mod value; pub mod write_limiter; diff --git a/src/storage/src/hummock/sstable_store.rs b/src/storage/src/hummock/sstable_store.rs index 6b6e1dde7f87..402e3e37594d 100644 --- a/src/storage/src/hummock/sstable_store.rs +++ b/src/storage/src/hummock/sstable_store.rs @@ -25,11 +25,8 @@ use foyer::{ HybridCacheEntry, }; use futures::{future, StreamExt}; -use itertools::Itertools; use risingwave_hummock_sdk::sstable_info::SstableInfo; -use risingwave_hummock_sdk::{ - HummockSstableObjectId, HUMMOCK_SSTABLE_OBJECT_ID_MAX_DECIMAL_LENGTH, OBJECT_SUFFIX, -}; +use risingwave_hummock_sdk::{HummockSstableObjectId, OBJECT_SUFFIX}; use risingwave_hummock_trace::TracedCachePolicy; use risingwave_object_store::object::{ ObjectError, ObjectMetadataIter, ObjectResult, ObjectStoreRef, ObjectStreamingUploader, @@ -234,27 +231,6 @@ impl SstableStore { Ok(()) } - /// Deletes all SSTs specified in the given list of IDs from storage and cache. - pub async fn delete_list( - &self, - object_id_list: &[HummockSstableObjectId], - ) -> HummockResult<()> { - let mut paths = Vec::with_capacity(object_id_list.len() * 2); - - for &object_id in object_id_list { - paths.push(self.get_sst_data_path(object_id)); - } - // Delete from storage. - self.store.delete_objects(&paths).await?; - - // Delete from cache. - for object_id in object_id_list { - self.meta_cache.remove(object_id); - } - - Ok(()) - } - pub fn delete_cache(&self, object_id: HummockSstableObjectId) -> HummockResult<()> { self.meta_cache.remove(&object_id); Ok(()) @@ -522,30 +498,11 @@ impl SstableStore { let obj_prefix = self .store .get_object_prefix(object_id, self.use_new_object_prefix_strategy); - let mut path = String::with_capacity( - self.path.len() - + "/".len() - + obj_prefix.len() - + HUMMOCK_SSTABLE_OBJECT_ID_MAX_DECIMAL_LENGTH - + ".".len() - + OBJECT_SUFFIX.len(), - ); - path.push_str(&self.path); - path.push('/'); - path.push_str(&obj_prefix); - path.push_str(&object_id.to_string()); - path.push('.'); - path.push_str(OBJECT_SUFFIX); - path + risingwave_hummock_sdk::get_sst_data_path(&obj_prefix, &self.path, object_id) } pub fn get_object_id_from_path(path: &str) -> HummockSstableObjectId { - let split = path.split(&['/', '.']).collect_vec(); - assert!(split.len() > 2); - assert_eq!(split[split.len() - 1], OBJECT_SUFFIX); - split[split.len() - 2] - .parse::() - .expect("valid sst id") + risingwave_hummock_sdk::get_object_id_from_path(path) } pub fn store(&self) -> ObjectStoreRef { diff --git a/src/storage/src/hummock/vacuum.rs b/src/storage/src/hummock/vacuum.rs deleted file mode 100644 index 31f3c75ec2bf..000000000000 --- a/src/storage/src/hummock/vacuum.rs +++ /dev/null @@ -1,156 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; - -use futures::{StreamExt, TryStreamExt}; -use risingwave_hummock_sdk::HummockSstableObjectId; -use risingwave_object_store::object::ObjectMetadataIter; -use risingwave_pb::hummock::{FullScanTask, VacuumTask}; -use risingwave_rpc_client::HummockMetaClient; -use thiserror_ext::AsReport; - -use super::{HummockError, HummockResult}; -use crate::hummock::{SstableStore, SstableStoreRef}; - -pub struct Vacuum; - -impl Vacuum { - /// Wrapper method that warns on any error and doesn't propagate it. - /// Returns false if any error. - pub async fn handle_vacuum_task( - sstable_store: SstableStoreRef, - sstable_object_ids: &[u64], - ) -> HummockResult<()> { - tracing::info!("try to vacuum SSTs {:?}", sstable_object_ids); - sstable_store.delete_list(sstable_object_ids).await?; - Ok(()) - } - - pub async fn report_vacuum_task( - vacuum_task: VacuumTask, - hummock_meta_client: Arc, - ) -> bool { - match hummock_meta_client.report_vacuum_task(vacuum_task).await { - Ok(_) => { - tracing::info!("vacuuming SSTs succeeded"); - } - Err(e) => { - tracing::warn!(error = %e.as_report(), "failed to report vacuum task"); - return false; - } - } - true - } - - pub async fn full_scan_inner( - full_scan_task: FullScanTask, - metadata_iter: ObjectMetadataIter, - ) -> HummockResult<(Vec, u64, u64, Option)> { - let mut total_object_count = 0; - let mut total_object_size = 0; - let mut next_start_after: Option = None; - let filtered = metadata_iter - .filter_map(|r| { - let result = match r { - Ok(o) => { - total_object_count += 1; - total_object_size += o.total_size; - // Determine if the LIST has been truncated. - // A false positives would at most cost one additional LIST later. - if let Some(limit) = full_scan_task.limit - && limit == total_object_count - { - next_start_after = Some(o.key.clone()); - tracing::debug!(next_start_after, "set next start after"); - } - if o.last_modified < full_scan_task.sst_retention_watermark as f64 { - Some(Ok(SstableStore::get_object_id_from_path(&o.key))) - } else { - None - } - } - Err(e) => Some(Err(HummockError::from(e))), - }; - async move { result } - }) - .try_collect::>() - .await?; - Ok(( - filtered, - total_object_count, - total_object_size as u64, - next_start_after, - )) - } - - /// Returns **filtered** object ids, and **unfiltered** total object count and size. - pub async fn handle_full_scan_task( - full_scan_task: FullScanTask, - sstable_store: SstableStoreRef, - ) -> HummockResult<(Vec, u64, u64, Option)> { - tracing::info!( - sst_retention_watermark = full_scan_task.sst_retention_watermark, - prefix = full_scan_task.prefix.as_ref().unwrap_or(&String::from("")), - start_after = full_scan_task.start_after, - limit = full_scan_task.limit, - "try to full scan SSTs" - ); - let metadata_iter = sstable_store - .list_object_metadata_from_object_store( - full_scan_task.prefix.clone(), - full_scan_task.start_after.clone(), - full_scan_task.limit.map(|i| i as usize), - ) - .await?; - Vacuum::full_scan_inner(full_scan_task, metadata_iter).await - } - - pub async fn report_full_scan_task( - filtered_object_ids: Vec, - unfiltered_count: u64, - unfiltered_size: u64, - start_after: Option, - next_start_after: Option, - hummock_meta_client: Arc, - ) -> bool { - tracing::info!( - filtered_object_ids_len = filtered_object_ids.len(), - unfiltered_count, - unfiltered_size, - start_after, - next_start_after, - "try to report full scan task" - ); - match hummock_meta_client - .report_full_scan_task( - filtered_object_ids, - unfiltered_count, - unfiltered_size, - start_after, - next_start_after, - ) - .await - { - Ok(_) => { - tracing::info!("full scan SSTs succeeded"); - } - Err(e) => { - tracing::warn!(error = %e.as_report(), "failed to report full scan task"); - return false; - } - } - true - } -}