diff --git a/e2e_test/time_travel/basic.slt b/e2e_test/time_travel/basic.slt index 962fd8e096a59..6dd65cf7125d3 100644 --- a/e2e_test/time_travel/basic.slt +++ b/e2e_test/time_travel/basic.slt @@ -1,6 +1,9 @@ statement ok SET RW_IMPLICIT_FLUSH TO true; +statement error time_travel_retention_ms cannot be less than 600000 +ALTER SYSTEM SET time_travel_retention_ms to 10; + statement ok CREATE TABLE t (k INT); diff --git a/src/common/src/system_param/mod.rs b/src/common/src/system_param/mod.rs index 2343e8e182dd9..4d3a994631534 100644 --- a/src/common/src/system_param/mod.rs +++ b/src/common/src/system_param/mod.rs @@ -419,6 +419,17 @@ impl ValidateOnSet for OverrideValidateOnSet { } Ok(()) } + + fn time_travel_retention_ms(v: &u64) -> Result<()> { + // This is intended to guarantee that non-time-travel batch query can still function even compute node's recent versions doesn't include the desired version. + let min_retention_ms = 600_000; + if *v < min_retention_ms { + return Err(format!( + "time_travel_retention_ms cannot be less than {min_retention_ms}" + )); + } + Ok(()) + } } for_all_params!(impl_default_from_other_params); diff --git a/src/config/ci-time-travel.toml b/src/config/ci-time-travel.toml index d160a8c94c69a..491d2bdecac0a 100644 --- a/src/config/ci-time-travel.toml +++ b/src/config/ci-time-travel.toml @@ -2,4 +2,4 @@ hummock_time_travel_snapshot_interval = 30 [system] -time_travel_retention_ms = 300000 +time_travel_retention_ms = 600000 diff --git a/src/meta/service/src/hummock_service.rs b/src/meta/service/src/hummock_service.rs index c3c3fcd6aa0f0..4ef731a01f03f 100644 --- a/src/meta/service/src/hummock_service.rs +++ b/src/meta/service/src/hummock_service.rs @@ -257,11 +257,16 @@ impl HummockManagerService for HummockServiceImpl { req.total_object_count, req.total_object_size, ); + let pinned_by_metadata_backup = self.vacuum_manager.backup_manager.list_pinned_ssts(); // The following operation takes some time, so we do it in dedicated task and responds the // RPC immediately. tokio::spawn(async move { match hummock_manager - .complete_full_gc(req.object_ids, req.next_start_after) + .complete_full_gc( + req.object_ids, + req.next_start_after, + pinned_by_metadata_backup, + ) .await { Ok(number) => { diff --git a/src/meta/src/hummock/manager/checkpoint.rs b/src/meta/src/hummock/manager/checkpoint.rs index ea747dbf402e5..3949331098441 100644 --- a/src/meta/src/hummock/manager/checkpoint.rs +++ b/src/meta/src/hummock/manager/checkpoint.rs @@ -138,7 +138,6 @@ impl HummockManager { drop(versioning_guard); let versioning = self.versioning.read().await; let context_info = self.context_info.read().await; - versioning.mark_objects_for_deletion(&context_info, &self.delete_object_tracker); let min_pinned_version_id = context_info.min_pinned_version_id(); trigger_gc_stat(&self.metrics, &versioning.checkpoint, min_pinned_version_id); return Ok(0); @@ -221,12 +220,9 @@ impl HummockManager { .collect(), }); } - // Whenever data archive or time travel is enabled, we can directly discard reference to stale objects that will no longer be used. - if self.env.opts.enable_hummock_data_archive || self.time_travel_enabled().await { - let context_info = self.context_info.read().await; - let min_pinned_version_id = context_info.min_pinned_version_id(); - stale_objects.retain(|version_id, _| *version_id >= min_pinned_version_id); - } + // We can directly discard reference to stale objects that will no longer be used. + let min_pinned_version_id = self.context_info.read().await.min_pinned_version_id(); + stale_objects.retain(|version_id, _| *version_id >= min_pinned_version_id); let new_checkpoint = HummockVersionCheckpoint { version: current_version.clone(), stale_objects, @@ -246,15 +242,9 @@ impl HummockManager { // 3. hold write lock and update in memory state let mut versioning_guard = self.versioning.write().await; let versioning = versioning_guard.deref_mut(); - let context_info = self.context_info.read().await; assert!(new_checkpoint.version.id > versioning.checkpoint.version.id); versioning.checkpoint = new_checkpoint; - // Not delete stale objects when archive or time travel is enabled - if !self.env.opts.enable_hummock_data_archive && !self.time_travel_enabled().await { - versioning.mark_objects_for_deletion(&context_info, &self.delete_object_tracker); - } - - let min_pinned_version_id = context_info.min_pinned_version_id(); + let min_pinned_version_id = self.context_info.read().await.min_pinned_version_id(); trigger_gc_stat(&self.metrics, &versioning.checkpoint, min_pinned_version_id); trigger_split_stat(&self.metrics, &versioning.current_version); drop(versioning_guard); diff --git a/src/meta/src/hummock/manager/gc.rs b/src/meta/src/hummock/manager/gc.rs index 6117b446971f2..6c8eb32e6a6c8 100644 --- a/src/meta/src/hummock/manager/gc.rs +++ b/src/meta/src/hummock/manager/gc.rs @@ -235,6 +235,7 @@ impl HummockManager { &self, object_ids: Vec, next_start_after: Option, + pinned_by_metadata_backup: HashSet, ) -> Result { // It's crucial to collect_min_uncommitted_sst_id (i.e. `min_sst_id`) only after LIST object store (i.e. `object_ids`). // Because after getting `min_sst_id`, new compute nodes may join and generate new uncommitted SSTs that are not covered by `min_sst_id`. @@ -271,6 +272,12 @@ impl HummockManager { .filter(|s| !pinned_object_ids.contains(s)) .collect_vec(); let after_time_travel = object_ids.len(); + // filter by metadata backup + let object_ids = object_ids + .into_iter() + .filter(|s| !pinned_by_metadata_backup.contains(s)) + .collect_vec(); + let after_metadata_backup = object_ids.len(); // filter by version let after_version = self.extend_objects_to_delete_from_scan(&object_ids).await?; metrics @@ -280,6 +287,7 @@ impl HummockManager { candidate_object_number, after_min_sst_id, after_time_travel, + after_metadata_backup, after_version, "complete full gc" ); @@ -515,6 +523,7 @@ impl PagedMetric { #[cfg(test)] mod tests { + use std::collections::HashSet; use std::sync::Arc; use std::time::Duration; @@ -576,7 +585,7 @@ mod tests { // Empty input results immediate return, without waiting heartbeat. hummock_manager - .complete_full_gc(vec![], None) + .complete_full_gc(vec![], None, HashSet::default()) .await .unwrap(); @@ -587,7 +596,8 @@ mod tests { hummock_manager .complete_full_gc( vec![i64::MAX as u64 - 2, i64::MAX as u64 - 1, i64::MAX as u64], - None + None, + HashSet::new(), ) .await .unwrap() @@ -613,7 +623,8 @@ mod tests { hummock_manager .complete_full_gc( [committed_object_ids, vec![max_committed_object_id + 1]].concat(), - None + None, + HashSet::default(), ) .await .unwrap() diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 8aef8a5e45ba8..68479d64727a2 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -410,11 +410,6 @@ impl HummockManager { .collect(); self.delete_object_tracker.clear(); - // Not delete stale objects when archive or time travel is enabled - if !self.env.opts.enable_hummock_data_archive && !self.time_travel_enabled().await { - versioning_guard.mark_objects_for_deletion(context_info, &self.delete_object_tracker); - } - self.initial_compaction_group_config_after_load( versioning_guard, self.compaction_group_manager.write().await.deref_mut(), diff --git a/src/meta/src/hummock/manager/versioning.rs b/src/meta/src/hummock/manager/versioning.rs index 4d81689b9f705..7fa93f72d02b8 100644 --- a/src/meta/src/hummock/manager/versioning.rs +++ b/src/meta/src/hummock/manager/versioning.rs @@ -36,7 +36,6 @@ use crate::hummock::error::Result; use crate::hummock::manager::checkpoint::HummockVersionCheckpoint; use crate::hummock::manager::commit_multi_var; use crate::hummock::manager::context::ContextInfo; -use crate::hummock::manager::gc::DeleteObjectTracker; use crate::hummock::manager::transaction::HummockVersionTransaction; use crate::hummock::metrics_utils::{trigger_write_stop_stats, LocalTableMetrics}; use crate::hummock::model::CompactionGroup; @@ -82,22 +81,6 @@ impl ContextInfo { } impl Versioning { - /// Marks all objects <= `min_pinned_version_id` for deletion. - pub(super) fn mark_objects_for_deletion( - &self, - context_info: &ContextInfo, - delete_object_tracker: &DeleteObjectTracker, - ) { - let min_pinned_version_id = context_info.min_pinned_version_id(); - delete_object_tracker.add( - self.checkpoint - .stale_objects - .iter() - .filter(|(version_id, _)| **version_id <= min_pinned_version_id) - .flat_map(|(_, stale_objects)| stale_objects.id.iter().cloned()), - ); - } - pub(super) fn mark_next_time_travel_version_snapshot(&mut self) { self.time_travel_snapshot_interval_counter = u64::MAX; } diff --git a/src/meta/src/hummock/vacuum.rs b/src/meta/src/hummock/vacuum.rs index dcb2d95d7ca1d..bc04aa53b6a87 100644 --- a/src/meta/src/hummock/vacuum.rs +++ b/src/meta/src/hummock/vacuum.rs @@ -35,7 +35,7 @@ pub type VacuumManagerRef = Arc; pub struct VacuumManager { env: MetaSrvEnv, hummock_manager: HummockManagerRef, - backup_manager: BackupManagerRef, + pub backup_manager: BackupManagerRef, /// Use the `CompactorManager` to dispatch `VacuumTask`. compactor_manager: CompactorManagerRef, /// SST object ids which have been dispatched to vacuum nodes but are not replied yet. @@ -112,8 +112,7 @@ impl VacuumManager { pending_object_ids } else { // 2. If no pending SST objects, then fetch new ones. - let mut objects_to_delete = self.hummock_manager.get_objects_to_delete(); - self.filter_out_pinned_ssts(&mut objects_to_delete).await?; + let objects_to_delete = self.hummock_manager.get_objects_to_delete(); if objects_to_delete.is_empty() { return Ok(vec![]); } @@ -178,29 +177,6 @@ impl VacuumManager { Ok(sent_batch) } - async fn filter_out_pinned_ssts( - &self, - objects_to_delete: &mut Vec, - ) -> MetaResult<()> { - if objects_to_delete.is_empty() { - return Ok(()); - } - let reject = self.backup_manager.list_pinned_ssts(); - // Ack these SSTs immediately, because they tend to be pinned for long time. - // They will be GCed during full GC when they are no longer pinned. - let to_ack = objects_to_delete - .iter() - .filter(|s| reject.contains(s)) - .cloned() - .collect_vec(); - if to_ack.is_empty() { - return Ok(()); - } - self.hummock_manager.ack_deleted_objects(&to_ack).await?; - objects_to_delete.retain(|s| !reject.contains(s)); - Ok(()) - } - /// Acknowledges deletion of SSTs and deletes corresponding metadata. pub async fn report_vacuum_task(&self, vacuum_task: VacuumTask) -> MetaResult<()> { let deleted_object_ids = self @@ -225,6 +201,7 @@ impl VacuumManager { #[cfg(test)] mod tests { + use std::collections::HashSet; use std::sync::Arc; use itertools::Itertools; @@ -275,7 +252,31 @@ mod tests { .await .unwrap(); hummock_manager.create_version_checkpoint(0).await.unwrap(); - assert!(!hummock_manager.get_objects_to_delete().is_empty()); + assert!(hummock_manager.get_objects_to_delete().is_empty()); + hummock_manager + .complete_full_gc( + sst_infos + .iter() + .flat_map(|ssts| ssts.iter().map(|s| s.object_id)) + .collect(), + None, + HashSet::default(), + ) + .await + .unwrap(); + assert_eq!(hummock_manager.get_objects_to_delete().len(), 3); + assert_eq!( + hummock_manager + .get_objects_to_delete() + .into_iter() + .sorted() + .collect::>(), + sst_infos[0] + .iter() + .map(|s| s.object_id) + .sorted() + .collect::>() + ); // No SST deletion is scheduled because no available worker. assert_eq!(vacuum.vacuum_object().await.unwrap().len(), 0); let _receiver = compactor_manager.add_compactor(context_id);