Skip to content

Commit

Permalink
refactor(meta): proactively GC more stale objects.
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 committed Nov 20, 2024
1 parent 79dbb2c commit c93bbd0
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 40 deletions.
2 changes: 1 addition & 1 deletion src/meta/node/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ pub async fn start_service_as_election_leader(
// sub_tasks executed concurrently. Can be shutdown via shutdown_all
sub_tasks.extend(hummock::start_hummock_workers(
hummock_manager.clone(),
// compaction_scheduler,
backup_manager.clone(),
&env.opts,
));
sub_tasks.push(start_worker_info_monitor(
Expand Down
45 changes: 33 additions & 12 deletions src/meta/src/hummock/manager/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::ops::Bound::{Excluded, Included};
use std::ops::{Deref, DerefMut};
use std::sync::atomic::Ordering;

use itertools::Itertools;
use risingwave_hummock_sdk::compaction_group::hummock_version_ext::object_size_map;
use risingwave_hummock_sdk::version::{GroupDeltaCommon, HummockVersion};
use risingwave_hummock_sdk::HummockVersionId;
use risingwave_hummock_sdk::{HummockSstableObjectId, HummockVersionId};
use risingwave_pb::hummock::hummock_version_checkpoint::{PbStaleObjects, StaleObjects};
use risingwave_pb::hummock::{
PbHummockVersion, PbHummockVersionArchive, PbHummockVersionCheckpoint,
Expand Down Expand Up @@ -122,7 +123,10 @@ impl HummockManager {
/// Returns the diff between new and old checkpoint id.
/// Note that this method must not be called concurrently, because internally it doesn't hold
/// lock throughout the method.
pub async fn create_version_checkpoint(&self, min_delta_log_num: u64) -> Result<u64> {
pub async fn create_version_checkpoint(
&self,
min_delta_log_num: u64,
) -> Result<(u64, HashSet<HummockSstableObjectId>)> {
let timer = self.metrics.version_checkpoint_latency.start_timer();
// 1. hold read lock and create new checkpoint
let versioning_guard = self.versioning.read().await;
Expand All @@ -132,15 +136,15 @@ impl HummockManager {
let new_checkpoint_id = current_version.id;
let old_checkpoint_id = old_checkpoint.version.id;
if new_checkpoint_id < old_checkpoint_id + min_delta_log_num {
return Ok(0);
return Ok((0, HashSet::default()));
}
if cfg!(test) && new_checkpoint_id == old_checkpoint_id {
drop(versioning_guard);
let versioning = self.versioning.read().await;
let context_info = self.context_info.read().await;
let min_pinned_version_id = context_info.min_pinned_version_id();
trigger_gc_stat(&self.metrics, &versioning.checkpoint, min_pinned_version_id);
return Ok(0);
return Ok((0, HashSet::default()));
}
assert!(new_checkpoint_id > old_checkpoint_id);
let mut archive: Option<PbHummockVersionArchive> = None;
Expand Down Expand Up @@ -203,13 +207,21 @@ impl HummockManager {
})
})
.sum::<u64>();
stale_objects.insert(
current_version.id,
StaleObjects {
stale_objects
.entry(current_version.id)
.and_modify(|s| {
s.id =
s.id.iter()
.chain(removed_object_ids.iter())
.unique()
.copied()
.collect();
s.total_file_size += total_file_size;
})
.or_insert(StaleObjects {
id: removed_object_ids.into_iter().collect(),
total_file_size,
},
);
});
if self.env.opts.enable_hummock_data_archive {
archive = Some(PbHummockVersionArchive {
version: Some(PbHummockVersion::from(&old_checkpoint.version)),
Expand All @@ -220,8 +232,17 @@ impl HummockManager {
.collect(),
});
}
// We can directly discard reference to stale objects that will no longer be used.
let min_pinned_version_id = self.context_info.read().await.min_pinned_version_id();
let may_delete_object = stale_objects
.iter()
.filter_map(|(version_id, object_ids)| {
if *version_id >= min_pinned_version_id {
return None;
}
Some(object_ids.id.clone())
})
.flatten()
.collect();
stale_objects.retain(|version_id, _| *version_id >= min_pinned_version_id);
let new_checkpoint = HummockVersionCheckpoint {
version: current_version.clone(),
Expand Down Expand Up @@ -253,7 +274,7 @@ impl HummockManager {
.checkpoint_version_id
.set(new_checkpoint_id.to_u64() as i64);

Ok(new_checkpoint_id - old_checkpoint_id)
Ok((new_checkpoint_id - old_checkpoint_id, may_delete_object))
}

pub fn pause_version_checkpoint(&self) {
Expand Down
20 changes: 20 additions & 0 deletions src/meta/src/hummock/manager/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,26 @@ impl HummockManager {
}
Ok(total)
}

/// Minor GC attempts to delete objects that were part of Hummock version but are no longer in use.
pub async fn start_minor_gc(
&self,
backup_manager: BackupManagerRef,
object_ids: impl Iterator<Item = HummockSstableObjectId>,
) -> Result<()> {
// Objects pinned by either meta backup or time travel should be filtered out.
let pinned_objects: HashSet<_> = backup_manager
.list_pinned_ssts()
.into_iter()
.chain(self.all_object_ids_in_time_travel().await?)
.collect();
let object_ids = object_ids
.filter(|s| !pinned_objects.contains(s))
.collect_vec();
// Retry is not necessary. Full GC will handle these objects eventually.
self.delete_objects(object_ids).await?;
Ok(())
}
}

async fn collect_min_uncommitted_sst_id(
Expand Down
30 changes: 25 additions & 5 deletions src/meta/src/hummock/manager/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,11 @@ async fn test_hummock_manager_basic() {
(0, 0)
);
assert_eq!(
hummock_manager.create_version_checkpoint(1).await.unwrap(),
hummock_manager
.create_version_checkpoint(1)
.await
.unwrap()
.0,
commit_log_count + register_log_count
);
assert_eq!(
Expand Down Expand Up @@ -1198,7 +1202,11 @@ async fn test_extend_objects_to_delete() {

// Checkpoint
assert_eq!(
hummock_manager.create_version_checkpoint(1).await.unwrap(),
hummock_manager
.create_version_checkpoint(1)
.await
.unwrap()
.0,
6
);
// since version1 is still pinned, the sst removed in compaction can not be reclaimed.
Expand Down Expand Up @@ -2143,7 +2151,11 @@ async fn test_gc_stats() {
};
assert_eq_gc_stats(0, 0, 0, 0, 0, 0);
assert_eq!(
hummock_manager.create_version_checkpoint(0).await.unwrap(),
hummock_manager
.create_version_checkpoint(0)
.await
.unwrap()
.0,
0
);

Expand All @@ -2157,7 +2169,11 @@ async fn test_gc_stats() {
.await;
assert_eq_gc_stats(0, 0, 0, 0, 0, 0);
assert_ne!(
hummock_manager.create_version_checkpoint(0).await.unwrap(),
hummock_manager
.create_version_checkpoint(0)
.await
.unwrap()
.0,
0
);

Expand All @@ -2169,7 +2185,11 @@ async fn test_gc_stats() {

assert_eq_gc_stats(0, 0, 6, 3, 2, 4);
assert_eq!(
hummock_manager.create_version_checkpoint(0).await.unwrap(),
hummock_manager
.create_version_checkpoint(0)
.await
.unwrap()
.0,
0
);
}
Expand Down
70 changes: 50 additions & 20 deletions src/meta/src/hummock/manager/time_travel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use risingwave_meta_model::{
hummock_epoch_to_version, hummock_sstable_info, hummock_time_travel_delta,
hummock_time_travel_version,
};
use risingwave_pb::hummock::hummock_version_checkpoint::PbStaleObjects;
use risingwave_pb::hummock::{PbHummockVersion, PbHummockVersionDelta};
use sea_orm::sea_query::OnConflict;
use sea_orm::ActiveValue::Set;
Expand Down Expand Up @@ -95,31 +96,37 @@ impl HummockManager {
"delete {} rows from hummock_epoch_to_version",
res.rows_affected
);
let earliest_valid_version = hummock_time_travel_version::Entity::find()
let latest_valid_version = hummock_time_travel_version::Entity::find()
.filter(
hummock_time_travel_version::Column::VersionId.lte(version_watermark.version_id),
)
.order_by_desc(hummock_time_travel_version::Column::VersionId)
.one(&txn)
.await?
.map(|m| HummockVersion::from_persisted_protobuf(&m.version.to_protobuf()));
let Some(earliest_valid_version) = earliest_valid_version else {
let Some(latest_valid_version) = latest_valid_version else {
txn.commit().await?;
return Ok(());
};
let (earliest_valid_version_id, earliest_valid_version_sst_ids) = {
let (
latest_valid_version_id,
latest_valid_version_sst_ids,
latest_valid_version_object_ids,
) = {
(
earliest_valid_version.id,
earliest_valid_version.get_sst_ids(),
latest_valid_version.id,
latest_valid_version.get_sst_ids(),
latest_valid_version.get_object_ids(),
)
};
let mut object_ids_to_delete: HashSet<_> = HashSet::default();
let version_ids_to_delete: Vec<risingwave_meta_model::HummockVersionId> =
hummock_time_travel_version::Entity::find()
.select_only()
.column(hummock_time_travel_version::Column::VersionId)
.filter(
hummock_time_travel_version::Column::VersionId
.lt(earliest_valid_version_id.to_u64()),
.lt(latest_valid_version_id.to_u64()),
)
.order_by_desc(hummock_time_travel_version::Column::VersionId)
.into_tuple()
Expand All @@ -131,7 +138,7 @@ impl HummockManager {
.column(hummock_time_travel_delta::Column::VersionId)
.filter(
hummock_time_travel_delta::Column::VersionId
.lt(earliest_valid_version_id.to_u64()),
.lt(latest_valid_version_id.to_u64()),
)
.into_tuple()
.all(&txn)
Expand All @@ -146,25 +153,27 @@ impl HummockManager {
delta_id_to_delete
)))
})?;
let new_sst_ids = HummockVersionDelta::from_persisted_protobuf(
let delta_to_delete = HummockVersionDelta::from_persisted_protobuf(
&delta_to_delete.version_delta.to_protobuf(),
)
.newly_added_sst_ids();
);
let new_sst_ids = delta_to_delete.newly_added_sst_ids();
// The SST ids added and then deleted by compaction between the 2 versions.
let sst_ids_to_delete = &new_sst_ids - &earliest_valid_version_sst_ids;
let sst_ids_to_delete = &new_sst_ids - &latest_valid_version_sst_ids;
let res = hummock_sstable_info::Entity::delete_many()
.filter(hummock_sstable_info::Column::SstId.is_in(sst_ids_to_delete))
.exec(&txn)
.await?;
let new_object_ids = delta_to_delete.newly_added_object_ids();
object_ids_to_delete.extend(&new_object_ids - &latest_valid_version_object_ids);
tracing::debug!(
delta_id = delta_to_delete.version_id,
delta_id = delta_to_delete.id.to_u64(),
"delete {} rows from hummock_sstable_info",
res.rows_affected
);
}
let mut next_version_sst_ids = earliest_valid_version_sst_ids;
let mut next_version_sst_ids = latest_valid_version_sst_ids;
for prev_version_id in version_ids_to_delete {
let sst_ids = {
let prev_version = {
let prev_version = hummock_time_travel_version::Entity::find_by_id(prev_version_id)
.one(&txn)
.await?
Expand All @@ -175,45 +184,66 @@ impl HummockManager {
)))
})?;
HummockVersion::from_persisted_protobuf(&prev_version.version.to_protobuf())
.get_sst_ids()
};
let sst_ids = prev_version.get_sst_ids();
// The SST ids deleted by compaction between the 2 versions.
let sst_ids_to_delete = &sst_ids - &next_version_sst_ids;
let res = hummock_sstable_info::Entity::delete_many()
.filter(hummock_sstable_info::Column::SstId.is_in(sst_ids_to_delete))
.exec(&txn)
.await?;
let new_object_ids = prev_version.get_object_ids();
object_ids_to_delete.extend(&new_object_ids - &latest_valid_version_object_ids);
tracing::debug!(
prev_version_id,
"delete {} rows from hummock_sstable_info",
res.rows_affected
);
next_version_sst_ids = sst_ids;
}
{
let mut guard = self.versioning.write().await;
guard
.checkpoint
.stale_objects
.entry(latest_valid_version_id)
.and_modify(|s| {
s.id =
s.id.iter()
.chain(object_ids_to_delete.iter())
.unique()
.copied()
.collect();
})
.or_insert(PbStaleObjects {
id: object_ids_to_delete.into_iter().collect(),
// To avoid unnecessary computations, disregard size in this context.
total_file_size: 0,
});
}

let res = hummock_time_travel_version::Entity::delete_many()
.filter(
hummock_time_travel_version::Column::VersionId
.lt(earliest_valid_version_id.to_u64()),
hummock_time_travel_version::Column::VersionId.lt(latest_valid_version_id.to_u64()),
)
.exec(&txn)
.await?;
tracing::debug!(
epoch_watermark_version_id = ?version_watermark.version_id,
?earliest_valid_version_id,
?latest_valid_version_id,
"delete {} rows from hummock_time_travel_version",
res.rows_affected
);

let res = hummock_time_travel_delta::Entity::delete_many()
.filter(
hummock_time_travel_delta::Column::VersionId.lt(earliest_valid_version_id.to_u64()),
hummock_time_travel_delta::Column::VersionId.lt(latest_valid_version_id.to_u64()),
)
.exec(&txn)
.await?;
tracing::debug!(
epoch_watermark_version_id = ?version_watermark.version_id,
?earliest_valid_version_id,
?latest_valid_version_id,
"delete {} rows from hummock_time_travel_delta",
res.rows_affected
);
Expand Down
Loading

0 comments on commit c93bbd0

Please sign in to comment.