Skip to content

Commit

Permalink
feat(meta): use version object ids view to clean stale objects and de…
Browse files Browse the repository at this point in the history
…precate gc_object_ids (#16309)
  • Loading branch information
wenym1 authored Apr 18, 2024
1 parent 4aed67f commit 57c5727
Show file tree
Hide file tree
Showing 10 changed files with 136 additions and 68 deletions.
3 changes: 2 additions & 1 deletion proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@ message HummockVersionDelta {
// Reads against such an epoch will fail.
uint64 safe_epoch = 5;
bool trivial_move = 6;
repeated uint64 gc_object_ids = 7;
reserved 7;
reserved "gc_object_ids";
map<uint32, TableWatermarks> new_table_watermarks = 8;
repeated uint32 removed_table_ids = 9;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ struct RwHummockVersionDelta {
max_committed_epoch: i64,
safe_epoch: i64,
trivial_move: bool,
gc_object_ids: JsonbVal,
group_deltas: JsonbVal,
}

Expand All @@ -42,7 +41,6 @@ async fn read(reader: &SysCatalogReaderImpl) -> Result<Vec<RwHummockVersionDelta
max_committed_epoch: d.max_committed_epoch as _,
safe_epoch: d.safe_epoch as _,
trivial_move: d.trivial_move,
gc_object_ids: json!(d.gc_object_ids).into(),
group_deltas: json!(d.group_deltas).into(),
})
.collect();
Expand Down
58 changes: 39 additions & 19 deletions src/meta/src/hummock/manager/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,20 @@ use risingwave_pb::hummock::{PbHummockVersionArchive, PbHummockVersionCheckpoint
use thiserror_ext::AsReport;

use crate::hummock::error::Result;
use crate::hummock::manager::versioning::Versioning;
use crate::hummock::manager::{read_lock, write_lock};
use crate::hummock::metrics_utils::trigger_gc_stat;
use crate::hummock::HummockManager;

#[derive(Default)]
pub struct HummockVersionCheckpoint {
pub version: HummockVersion,

/// stale objects of versions before the current checkpoint.
///
/// Previously we stored the stale object of each single version.
/// Currently we will merge the stale object between two checkpoints, and only the
/// id of the checkpointed hummock version are included in the map.
pub stale_objects: HashMap<HummockVersionId, PbStaleObjects>,
}

Expand Down Expand Up @@ -119,18 +126,29 @@ impl HummockManager {
let timer = self.metrics.version_checkpoint_latency.start_timer();
// 1. hold read lock and create new checkpoint
let versioning_guard = read_lock!(self, versioning).await;
let versioning = versioning_guard.deref();
let current_version = &versioning.current_version;
let old_checkpoint = &versioning.checkpoint;
let versioning: &Versioning = versioning_guard.deref();
let current_version: &HummockVersion = &versioning.current_version;
let old_checkpoint: &HummockVersionCheckpoint = &versioning.checkpoint;
let new_checkpoint_id = current_version.id;
let old_checkpoint_id = old_checkpoint.version.id;
if new_checkpoint_id < old_checkpoint_id + min_delta_log_num {
return Ok(0);
}
if cfg!(test) && new_checkpoint_id == old_checkpoint_id {
drop(versioning_guard);
let mut versioning = write_lock!(self, versioning).await;
versioning.mark_objects_for_deletion();
let min_pinned_version_id = versioning.min_pinned_version_id();
trigger_gc_stat(&self.metrics, &versioning.checkpoint, min_pinned_version_id);
return Ok(0);
}
assert!(new_checkpoint_id > old_checkpoint_id);
let mut archive: Option<PbHummockVersionArchive> = None;
let mut stale_objects = old_checkpoint.stale_objects.clone();
// `object_sizes` is used to calculate size of stale objects.
let mut object_sizes = object_size_map(&old_checkpoint.version);
// The set of object ids that once exist in any hummock version
let mut versions_object_ids = old_checkpoint.version.get_object_ids();
for (_, version_delta) in versioning
.hummock_version_deltas
.range((Excluded(old_checkpoint_id), Included(new_checkpoint_id)))
Expand All @@ -144,22 +162,24 @@ impl HummockManager {
.map(|t| (t.object_id, t.file_size)),
);
}
let removed_object_ids = version_delta.gc_object_ids.clone();
if removed_object_ids.is_empty() {
continue;
}
let total_file_size = removed_object_ids
.iter()
.map(|t| object_sizes.get(t).copied().unwrap())
.sum::<u64>();
stale_objects.insert(
version_delta.id,
StaleObjects {
id: removed_object_ids,
total_file_size,
},
);

versions_object_ids.extend(version_delta.newly_added_object_ids());
}

// Object ids that once exist in any hummock version but not exist in the latest hummock version
let removed_object_ids = &versions_object_ids - &current_version.get_object_ids();

let total_file_size = removed_object_ids
.iter()
.map(|t| object_sizes.get(t).copied().unwrap())
.sum::<u64>();
stale_objects.insert(
current_version.id,
StaleObjects {
id: removed_object_ids.into_iter().collect(),
total_file_size,
},
);
if self.env.opts.enable_hummock_data_archive {
archive = Some(PbHummockVersionArchive {
version: Some(old_checkpoint.version.to_protobuf()),
Expand Down Expand Up @@ -189,7 +209,7 @@ impl HummockManager {
// 3. hold write lock and update in memory state
let mut versioning_guard = write_lock!(self, versioning).await;
let versioning = versioning_guard.deref_mut();
assert!(new_checkpoint.version.id >= versioning.checkpoint.version.id);
assert!(new_checkpoint.version.id > versioning.checkpoint.version.id);
versioning.checkpoint = new_checkpoint;
// Not delete stale objects when archive is enabled
if !self.env.opts.enable_hummock_data_archive {
Expand Down
4 changes: 1 addition & 3 deletions src/meta/src/hummock/manager/compaction_group_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,9 +325,7 @@ impl HummockManager {
// group is to be removed.
// However, we need to take care of SST GC for the removed group.
for (object_id, sst_id) in get_compaction_group_ssts(current_version, *group_id) {
if drop_sst(&mut branched_ssts, *group_id, object_id, sst_id) {
new_version_delta.gc_object_ids.push(object_id);
}
drop_sst(&mut branched_ssts, *group_id, object_id, sst_id);
}
let group_deltas = &mut new_version_delta
.group_deltas
Expand Down
8 changes: 4 additions & 4 deletions src/meta/src/hummock/manager/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,14 @@ impl HummockManager {
let versioning_guard = read_lock!(self, versioning).await;
let versioning: &Versioning = &versioning_guard;

// object ids in current version
let mut tracked_object_ids = versioning.current_version.get_object_ids();
// add object ids removed between checkpoint version and current version
// object ids in checkpoint version
let mut tracked_object_ids = versioning.checkpoint.version.get_object_ids();
// add object ids added between checkpoint version and current version
for (_, delta) in versioning.hummock_version_deltas.range((
Excluded(versioning.checkpoint.version.id),
Included(versioning.current_version.id),
)) {
tracked_object_ids.extend(delta.gc_object_ids.iter().cloned());
tracked_object_ids.extend(delta.newly_added_object_ids());
}
// add stale object ids before the checkpoint version
let min_pinned_version_id = versioning.min_pinned_version_id();
Expand Down
12 changes: 5 additions & 7 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1985,6 +1985,7 @@ impl HummockManager {
pub async fn check_state_consistency(&self) {
let mut compaction_guard = write_lock!(self, compaction).await;
let mut versioning_guard = write_lock!(self, versioning).await;
let objects_to_delete = versioning_guard.objects_to_delete.clone();
// We don't check `checkpoint` because it's allowed to update its in memory state without
// persisting to object store.
let get_state =
Expand Down Expand Up @@ -2022,6 +2023,7 @@ impl HummockManager {
mem_state, loaded_state,
"hummock in-mem state is inconsistent with meta store state",
);
versioning_guard.objects_to_delete = objects_to_delete;
}

/// Gets current version without pinning it.
Expand Down Expand Up @@ -3353,7 +3355,6 @@ fn gen_version_delta<'a>(
.entry(compact_task.compaction_group_id)
.or_default()
.group_deltas;
let mut gc_object_ids = vec![];
let mut removed_table_ids_map: BTreeMap<u32, Vec<u64>> = BTreeMap::default();

for level in &compact_task.input_ssts {
Expand All @@ -3364,15 +3365,13 @@ fn gen_version_delta<'a>(
.map(|sst| {
let object_id = sst.get_object_id();
let sst_id = sst.get_sst_id();
if !trivial_move
&& drop_sst(
if !trivial_move {
drop_sst(
branched_ssts,
compact_task.compaction_group_id,
object_id,
sst_id,
)
{
gc_object_ids.push(object_id);
);
}
sst_id
})
Expand Down Expand Up @@ -3405,7 +3404,6 @@ fn gen_version_delta<'a>(
})),
};
group_deltas.push(group_delta);
version_delta.gc_object_ids.append(&mut gc_object_ids);
version_delta.safe_epoch = std::cmp::max(old_version.safe_epoch, compact_task.watermark);

// Don't persist version delta generated by compaction to meta store in deterministic mode.
Expand Down
43 changes: 41 additions & 2 deletions src/meta/src/hummock/manager/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1157,12 +1157,51 @@ async fn test_extend_objects_to_delete() {
);
let objects_to_delete = hummock_manager.get_objects_to_delete().await;
assert_eq!(objects_to_delete.len(), orphan_sst_num as usize);
let pinned_version2 = hummock_manager.pin_version(context_id).await.unwrap();
let pinned_version2: HummockVersion = hummock_manager.pin_version(context_id).await.unwrap();
let objects_to_delete = hummock_manager.get_objects_to_delete().await;
assert_eq!(
objects_to_delete.len(),
orphan_sst_num as usize,
"{:?}",
objects_to_delete
);
hummock_manager
.unpin_version_before(context_id, pinned_version2.id)
.await
.unwrap();
// version1 is unpin, and then the sst removed in compaction can be reclaimed
let objects_to_delete = hummock_manager.get_objects_to_delete().await;
assert_eq!(
objects_to_delete.len(),
orphan_sst_num as usize,
"{:?}",
objects_to_delete
);
// version1 is unpin, but version2 is pinned, and version2 is the checkpoint version.
// stale objects are combined in the checkpoint of version2, so no sst to reclaim
assert_eq!(
hummock_manager
.extend_objects_to_delete_from_scan(&all_object_ids)
.await,
orphan_sst_num as usize
);
let objects_to_delete = hummock_manager.get_objects_to_delete().await;
assert_eq!(objects_to_delete.len(), orphan_sst_num as usize);
let new_epoch = pinned_version2.max_committed_epoch.next_epoch();
hummock_manager
.commit_epoch(
new_epoch,
CommitEpochInfo::for_test(Vec::<ExtendedSstableInfo>::new(), Default::default()),
)
.await
.unwrap();
let pinned_version3: HummockVersion = hummock_manager.pin_version(context_id).await.unwrap();
assert_eq!(new_epoch, pinned_version3.max_committed_epoch);
hummock_manager
.unpin_version_before(context_id, pinned_version3.id)
.await
.unwrap();
// version3 is the min pinned, and sst removed in compaction can be reclaimed, because they were tracked
// in the stale objects of version2 checkpoint
assert_eq!(
hummock_manager
.extend_objects_to_delete_from_scan(&all_object_ids)
Expand Down
34 changes: 10 additions & 24 deletions src/meta/src/hummock/vacuum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,40 +228,29 @@ mod tests {
backup_manager,
compactor_manager.clone(),
));
assert_eq!(VacuumManager::vacuum_metadata(&vacuum).await.unwrap(), 0);
assert_eq!(
VacuumManager::vacuum_object(&vacuum).await.unwrap().len(),
0
);
assert_eq!(vacuum.vacuum_metadata().await.unwrap(), 0);
assert_eq!(vacuum.vacuum_object().await.unwrap().len(), 0);
hummock_manager.pin_version(context_id).await.unwrap();
let sst_infos = add_test_tables(hummock_manager.as_ref(), context_id).await;
assert_eq!(VacuumManager::vacuum_metadata(&vacuum).await.unwrap(), 0);
assert_eq!(vacuum.vacuum_metadata().await.unwrap(), 0);
hummock_manager.create_version_checkpoint(1).await.unwrap();
assert_eq!(VacuumManager::vacuum_metadata(&vacuum).await.unwrap(), 6);
assert_eq!(VacuumManager::vacuum_metadata(&vacuum).await.unwrap(), 0);
assert_eq!(vacuum.vacuum_metadata().await.unwrap(), 6);
assert_eq!(vacuum.vacuum_metadata().await.unwrap(), 0);

assert!(hummock_manager.get_objects_to_delete().await.is_empty());
hummock_manager
.unpin_version_before(context_id, HummockVersionId::MAX)
.await
.unwrap();
hummock_manager.create_version_checkpoint(0).await.unwrap();
assert!(!hummock_manager.get_objects_to_delete().await.is_empty());
// No SST deletion is scheduled because no available worker.
assert_eq!(
VacuumManager::vacuum_object(&vacuum).await.unwrap().len(),
0
);
assert_eq!(vacuum.vacuum_object().await.unwrap().len(), 0);
let _receiver = compactor_manager.add_compactor(context_id);
// SST deletion is scheduled.
assert_eq!(
VacuumManager::vacuum_object(&vacuum).await.unwrap().len(),
3
);
assert_eq!(vacuum.vacuum_object().await.unwrap().len(), 3);
// The deletion is not acked yet.
assert_eq!(
VacuumManager::vacuum_object(&vacuum).await.unwrap().len(),
3
);
assert_eq!(vacuum.vacuum_object().await.unwrap().len(), 3);
// The vacuum task is reported.
vacuum
.report_vacuum_task(VacuumTask {
Expand All @@ -275,9 +264,6 @@ mod tests {
.await
.unwrap();
// No objects_to_delete.
assert_eq!(
VacuumManager::vacuum_object(&vacuum).await.unwrap().len(),
0
);
assert_eq!(vacuum.vacuum_object().await.unwrap().len(), 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1065,7 +1065,6 @@ pub fn build_version_delta_after_version(version: &HummockVersion) -> HummockVer
trivial_move: false,
max_committed_epoch: version.max_committed_epoch,
group_deltas: Default::default(),
gc_object_ids: vec![],
new_table_watermarks: HashMap::new(),
removed_table_ids: vec![],
}
Expand Down
Loading

0 comments on commit 57c5727

Please sign in to comment.