Skip to content

Commit

Permalink
refactor(meta): validate time_travel_retention_ms (#18979)
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 authored Oct 21, 2024
1 parent 43544ac commit a04365a
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 68 deletions.
3 changes: 3 additions & 0 deletions e2e_test/time_travel/basic.slt
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement error time_travel_retention_ms cannot be less than 600000
ALTER SYSTEM SET time_travel_retention_ms to 10;

statement ok
CREATE TABLE t (k INT);

Expand Down
11 changes: 11 additions & 0 deletions src/common/src/system_param/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,17 @@ impl ValidateOnSet for OverrideValidateOnSet {
}
Ok(())
}

fn time_travel_retention_ms(v: &u64) -> Result<()> {
// This is intended to guarantee that non-time-travel batch query can still function even compute node's recent versions doesn't include the desired version.
let min_retention_ms = 600_000;
if *v < min_retention_ms {
return Err(format!(
"time_travel_retention_ms cannot be less than {min_retention_ms}"
));
}
Ok(())
}
}

for_all_params!(impl_default_from_other_params);
Expand Down
2 changes: 1 addition & 1 deletion src/config/ci-time-travel.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
hummock_time_travel_snapshot_interval = 30

[system]
time_travel_retention_ms = 300000
time_travel_retention_ms = 600000
7 changes: 6 additions & 1 deletion src/meta/service/src/hummock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,11 +257,16 @@ impl HummockManagerService for HummockServiceImpl {
req.total_object_count,
req.total_object_size,
);
let pinned_by_metadata_backup = self.vacuum_manager.backup_manager.list_pinned_ssts();
// The following operation takes some time, so we do it in dedicated task and responds the
// RPC immediately.
tokio::spawn(async move {
match hummock_manager
.complete_full_gc(req.object_ids, req.next_start_after)
.complete_full_gc(
req.object_ids,
req.next_start_after,
pinned_by_metadata_backup,
)
.await
{
Ok(number) => {
Expand Down
18 changes: 4 additions & 14 deletions src/meta/src/hummock/manager/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ impl HummockManager {
drop(versioning_guard);
let versioning = self.versioning.read().await;
let context_info = self.context_info.read().await;
versioning.mark_objects_for_deletion(&context_info, &self.delete_object_tracker);
let min_pinned_version_id = context_info.min_pinned_version_id();
trigger_gc_stat(&self.metrics, &versioning.checkpoint, min_pinned_version_id);
return Ok(0);
Expand Down Expand Up @@ -221,12 +220,9 @@ impl HummockManager {
.collect(),
});
}
// Whenever data archive or time travel is enabled, we can directly discard reference to stale objects that will no longer be used.
if self.env.opts.enable_hummock_data_archive || self.time_travel_enabled().await {
let context_info = self.context_info.read().await;
let min_pinned_version_id = context_info.min_pinned_version_id();
stale_objects.retain(|version_id, _| *version_id >= min_pinned_version_id);
}
// We can directly discard reference to stale objects that will no longer be used.
let min_pinned_version_id = self.context_info.read().await.min_pinned_version_id();
stale_objects.retain(|version_id, _| *version_id >= min_pinned_version_id);
let new_checkpoint = HummockVersionCheckpoint {
version: current_version.clone(),
stale_objects,
Expand All @@ -246,15 +242,9 @@ impl HummockManager {
// 3. hold write lock and update in memory state
let mut versioning_guard = self.versioning.write().await;
let versioning = versioning_guard.deref_mut();
let context_info = self.context_info.read().await;
assert!(new_checkpoint.version.id > versioning.checkpoint.version.id);
versioning.checkpoint = new_checkpoint;
// Not delete stale objects when archive or time travel is enabled
if !self.env.opts.enable_hummock_data_archive && !self.time_travel_enabled().await {
versioning.mark_objects_for_deletion(&context_info, &self.delete_object_tracker);
}

let min_pinned_version_id = context_info.min_pinned_version_id();
let min_pinned_version_id = self.context_info.read().await.min_pinned_version_id();
trigger_gc_stat(&self.metrics, &versioning.checkpoint, min_pinned_version_id);
trigger_split_stat(&self.metrics, &versioning.current_version);
drop(versioning_guard);
Expand Down
17 changes: 14 additions & 3 deletions src/meta/src/hummock/manager/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ impl HummockManager {
&self,
object_ids: Vec<HummockSstableObjectId>,
next_start_after: Option<String>,
pinned_by_metadata_backup: HashSet<HummockSstableObjectId>,
) -> Result<usize> {
// It's crucial to collect_min_uncommitted_sst_id (i.e. `min_sst_id`) only after LIST object store (i.e. `object_ids`).
// Because after getting `min_sst_id`, new compute nodes may join and generate new uncommitted SSTs that are not covered by `min_sst_id`.
Expand Down Expand Up @@ -271,6 +272,12 @@ impl HummockManager {
.filter(|s| !pinned_object_ids.contains(s))
.collect_vec();
let after_time_travel = object_ids.len();
// filter by metadata backup
let object_ids = object_ids
.into_iter()
.filter(|s| !pinned_by_metadata_backup.contains(s))
.collect_vec();
let after_metadata_backup = object_ids.len();
// filter by version
let after_version = self.extend_objects_to_delete_from_scan(&object_ids).await?;
metrics
Expand All @@ -280,6 +287,7 @@ impl HummockManager {
candidate_object_number,
after_min_sst_id,
after_time_travel,
after_metadata_backup,
after_version,
"complete full gc"
);
Expand Down Expand Up @@ -515,6 +523,7 @@ impl PagedMetric {

#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;

Expand Down Expand Up @@ -576,7 +585,7 @@ mod tests {

// Empty input results immediate return, without waiting heartbeat.
hummock_manager
.complete_full_gc(vec![], None)
.complete_full_gc(vec![], None, HashSet::default())
.await
.unwrap();

Expand All @@ -587,7 +596,8 @@ mod tests {
hummock_manager
.complete_full_gc(
vec![i64::MAX as u64 - 2, i64::MAX as u64 - 1, i64::MAX as u64],
None
None,
HashSet::new(),
)
.await
.unwrap()
Expand All @@ -613,7 +623,8 @@ mod tests {
hummock_manager
.complete_full_gc(
[committed_object_ids, vec![max_committed_object_id + 1]].concat(),
None
None,
HashSet::default(),
)
.await
.unwrap()
Expand Down
5 changes: 0 additions & 5 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,11 +410,6 @@ impl HummockManager {
.collect();

self.delete_object_tracker.clear();
// Not delete stale objects when archive or time travel is enabled
if !self.env.opts.enable_hummock_data_archive && !self.time_travel_enabled().await {
versioning_guard.mark_objects_for_deletion(context_info, &self.delete_object_tracker);
}

self.initial_compaction_group_config_after_load(
versioning_guard,
self.compaction_group_manager.write().await.deref_mut(),
Expand Down
17 changes: 0 additions & 17 deletions src/meta/src/hummock/manager/versioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use crate::hummock::error::Result;
use crate::hummock::manager::checkpoint::HummockVersionCheckpoint;
use crate::hummock::manager::commit_multi_var;
use crate::hummock::manager::context::ContextInfo;
use crate::hummock::manager::gc::DeleteObjectTracker;
use crate::hummock::manager::transaction::HummockVersionTransaction;
use crate::hummock::metrics_utils::{trigger_write_stop_stats, LocalTableMetrics};
use crate::hummock::model::CompactionGroup;
Expand Down Expand Up @@ -82,22 +81,6 @@ impl ContextInfo {
}

impl Versioning {
/// Marks all objects <= `min_pinned_version_id` for deletion.
pub(super) fn mark_objects_for_deletion(
&self,
context_info: &ContextInfo,
delete_object_tracker: &DeleteObjectTracker,
) {
let min_pinned_version_id = context_info.min_pinned_version_id();
delete_object_tracker.add(
self.checkpoint
.stale_objects
.iter()
.filter(|(version_id, _)| **version_id <= min_pinned_version_id)
.flat_map(|(_, stale_objects)| stale_objects.id.iter().cloned()),
);
}

pub(super) fn mark_next_time_travel_version_snapshot(&mut self) {
self.time_travel_snapshot_interval_counter = u64::MAX;
}
Expand Down
55 changes: 28 additions & 27 deletions src/meta/src/hummock/vacuum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub type VacuumManagerRef = Arc<VacuumManager>;
pub struct VacuumManager {
env: MetaSrvEnv,
hummock_manager: HummockManagerRef,
backup_manager: BackupManagerRef,
pub backup_manager: BackupManagerRef,
/// Use the `CompactorManager` to dispatch `VacuumTask`.
compactor_manager: CompactorManagerRef,
/// SST object ids which have been dispatched to vacuum nodes but are not replied yet.
Expand Down Expand Up @@ -112,8 +112,7 @@ impl VacuumManager {
pending_object_ids
} else {
// 2. If no pending SST objects, then fetch new ones.
let mut objects_to_delete = self.hummock_manager.get_objects_to_delete();
self.filter_out_pinned_ssts(&mut objects_to_delete).await?;
let objects_to_delete = self.hummock_manager.get_objects_to_delete();
if objects_to_delete.is_empty() {
return Ok(vec![]);
}
Expand Down Expand Up @@ -178,29 +177,6 @@ impl VacuumManager {
Ok(sent_batch)
}

async fn filter_out_pinned_ssts(
&self,
objects_to_delete: &mut Vec<HummockSstableObjectId>,
) -> MetaResult<()> {
if objects_to_delete.is_empty() {
return Ok(());
}
let reject = self.backup_manager.list_pinned_ssts();
// Ack these SSTs immediately, because they tend to be pinned for long time.
// They will be GCed during full GC when they are no longer pinned.
let to_ack = objects_to_delete
.iter()
.filter(|s| reject.contains(s))
.cloned()
.collect_vec();
if to_ack.is_empty() {
return Ok(());
}
self.hummock_manager.ack_deleted_objects(&to_ack).await?;
objects_to_delete.retain(|s| !reject.contains(s));
Ok(())
}

/// Acknowledges deletion of SSTs and deletes corresponding metadata.
pub async fn report_vacuum_task(&self, vacuum_task: VacuumTask) -> MetaResult<()> {
let deleted_object_ids = self
Expand All @@ -225,6 +201,7 @@ impl VacuumManager {

#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::sync::Arc;

use itertools::Itertools;
Expand Down Expand Up @@ -275,7 +252,31 @@ mod tests {
.await
.unwrap();
hummock_manager.create_version_checkpoint(0).await.unwrap();
assert!(!hummock_manager.get_objects_to_delete().is_empty());
assert!(hummock_manager.get_objects_to_delete().is_empty());
hummock_manager
.complete_full_gc(
sst_infos
.iter()
.flat_map(|ssts| ssts.iter().map(|s| s.object_id))
.collect(),
None,
HashSet::default(),
)
.await
.unwrap();
assert_eq!(hummock_manager.get_objects_to_delete().len(), 3);
assert_eq!(
hummock_manager
.get_objects_to_delete()
.into_iter()
.sorted()
.collect::<Vec<_>>(),
sst_infos[0]
.iter()
.map(|s| s.object_id)
.sorted()
.collect::<Vec<_>>()
);
// No SST deletion is scheduled because no available worker.
assert_eq!(vacuum.vacuum_object().await.unwrap().len(), 0);
let _receiver = compactor_manager.add_compactor(context_id);
Expand Down

0 comments on commit a04365a

Please sign in to comment.