Skip to content

Commit

Permalink
[pruner] compaction filter for objects table
Browse files Browse the repository at this point in the history
  • Loading branch information
phoenix-o committed Jan 14, 2025
1 parent 35d516b commit db99597
Show file tree
Hide file tree
Showing 11 changed files with 220 additions and 21 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions crates/sui-config/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,13 @@ pub struct AuthorityStorePruningConfig {
pub killswitch_tombstone_pruning: bool,
#[serde(default = "default_smoothing", skip_serializing_if = "is_true")]
pub smooth: bool,
/// Enables the compaction filter for pruning the objects table.
/// If disabled, a range deletion approach is used instead.
/// While it is generally safe to switch between the two modes,
/// switching from the compaction filter approach back to range deletion
/// may result in some old versions that will never be pruned.
#[serde(default, skip_serializing_if = "std::ops::Not::not")]
pub enable_compaction_filter: bool,
}

fn default_num_latest_epoch_dbs_to_retain() -> usize {
Expand Down Expand Up @@ -910,6 +917,7 @@ impl Default for AuthorityStorePruningConfig {
num_epochs_to_retain_for_checkpoints: if cfg!(msim) { Some(2) } else { None },
killswitch_tombstone_pruning: false,
smooth: true,
enable_compaction_filter: false,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/sui-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ arc-swap.workspace = true
async-trait.workspace = true
axum.workspace = true
bcs.workspace = true
bincode.workspace = true
bytes.workspace = true
consensus-core.workspace = true
consensus-config.workspace = true
Expand Down
4 changes: 4 additions & 0 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ pub use crate::checkpoints::checkpoint_executor::{
init_checkpoint_timeout_config, CheckpointTimeoutConfig,
};

use crate::authority::authority_store_tables::AuthorityPrunerTables;
use crate::authority_client::NetworkAuthorityClient;
use crate::validator_tx_finalizer::ValidatorTxFinalizer;
#[cfg(msim)]
Expand Down Expand Up @@ -2863,6 +2864,7 @@ impl AuthorityState {
archive_readers: ArchiveReaderBalancer,
validator_tx_finalizer: Option<Arc<ValidatorTxFinalizer<NetworkAuthorityClient>>>,
chain_identifier: ChainIdentifier,
pruner_db: Option<Arc<AuthorityPrunerTables>>,
) -> Arc<Self> {
Self::check_protocol_version(supported_protocol_versions, epoch_store.protocol_version());

Expand Down Expand Up @@ -2894,6 +2896,7 @@ impl AuthorityState {
prometheus_registry,
indirect_objects_threshold,
archive_readers,
pruner_db,
);
let input_loader =
TransactionInputLoader::new(execution_cache_trait_pointers.object_cache_reader.clone());
Expand Down Expand Up @@ -3001,6 +3004,7 @@ impl AuthorityState {
&self.checkpoint_store,
self.rpc_index.as_deref(),
&self.database_for_testing().objects_lock_table,
None,
config.authority_store_pruning_config,
metrics,
config.indirect_objects_threshold,
Expand Down
1 change: 1 addition & 0 deletions crates/sui-core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1802,6 +1802,7 @@ impl AuthorityStore {
checkpoint_store,
rpc_index,
&self.objects_lock_table,
None,
pruning_config,
AuthorityStorePruningMetrics::new_for_test(),
usize::MAX,
Expand Down
118 changes: 109 additions & 9 deletions crates/sui-core/src/authority/authority_store_pruner.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::authority::authority_store_types::{ObjectContentDigest, StoreData, StoreObject};
use super::authority_store_tables::{AuthorityPerpetualTables, AuthorityPrunerTables};
use crate::authority::authority_store_types::{
ObjectContentDigest, StoreData, StoreObject, StoreObjectWrapper,
};
use crate::checkpoints::{CheckpointStore, CheckpointWatermark};
use crate::rpc_index::RpcIndexStore;
use anyhow::anyhow;
use bincode::Options;
use mysten_metrics::{monitored_scope, spawn_monitored_task};
use once_cell::sync::Lazy;
use prometheus::{
Expand All @@ -13,7 +17,7 @@ use prometheus::{
};
use std::cmp::{max, min};
use std::collections::{BTreeSet, HashMap};
use std::sync::Mutex;
use std::sync::{Mutex, Weak};
use std::time::{SystemTime, UNIX_EPOCH};
use std::{sync::Arc, time::Duration};
use sui_archival::reader::ArchiveReaderBalancer;
Expand All @@ -34,11 +38,10 @@ use sui_types::{
use tokio::sync::oneshot::{self, Sender};
use tokio::time::Instant;
use tracing::{debug, error, info, warn};
use typed_store::rocksdb::compaction_filter::Decision;
use typed_store::rocksdb::LiveFile;
use typed_store::{Map, TypedStoreError};

use super::authority_store_tables::AuthorityPerpetualTables;

static PERIODIC_PRUNING_TABLES: Lazy<BTreeSet<String>> = Lazy::new(|| {
[
"objects",
Expand Down Expand Up @@ -128,13 +131,15 @@ impl AuthorityStorePruner {
transaction_effects: Vec<TransactionEffects>,
perpetual_db: &Arc<AuthorityPerpetualTables>,
objects_lock_table: &Arc<RwLockTable<ObjectContentDigest>>,
pruner_db: Option<&Arc<AuthorityPrunerTables>>,
checkpoint_number: CheckpointSequenceNumber,
metrics: Arc<AuthorityStorePruningMetrics>,
indirect_objects_threshold: usize,
enable_pruning_tombstones: bool,
) -> anyhow::Result<()> {
let _scope = monitored_scope("ObjectsLivePruner");
let mut wb = perpetual_db.objects.batch();
let mut pruner_db_wb = pruner_db.map(|db| db.object_tombstones.batch());

// Collect objects keys that need to be deleted from `transaction_effects`.
let mut live_object_keys_to_prune = vec![];
Expand Down Expand Up @@ -188,9 +193,19 @@ impl AuthorityStorePruner {
"Pruning object {:?} versions {:?} - {:?}",
object_id, min_version, max_version
);
let start_range = ObjectKey(object_id, min_version);
let end_range = ObjectKey(object_id, (max_version.value() + 1).into());
wb.schedule_delete_range(&perpetual_db.objects, &start_range, &end_range)?;
match pruner_db_wb {
Some(ref mut batch) => {
batch.insert_batch(
&pruner_db.expect("invariant checked").object_tombstones,
std::iter::once((object_id, max_version)),
)?;
}
None => {
let start_range = ObjectKey(object_id, min_version);
let end_range = ObjectKey(object_id, (max_version.value() + 1).into());
wb.schedule_delete_range(&perpetual_db.objects, &start_range, &end_range)?;
}
}
}

// When enable_pruning_tombstones is enabled, instead of using range deletes, we need to do a scan of all the keys
Expand Down Expand Up @@ -226,6 +241,9 @@ impl AuthorityStorePruner {
let _locks = objects_lock_table
.acquire_locks(indirect_objects.into_keys())
.await;
if let Some(batch) = pruner_db_wb {
batch.write()?;
}
wb.write()?;
Ok(())
}
Expand Down Expand Up @@ -314,6 +332,7 @@ impl AuthorityStorePruner {
checkpoint_store: &Arc<CheckpointStore>,
rpc_index: Option<&RpcIndexStore>,
objects_lock_table: &Arc<RwLockTable<ObjectContentDigest>>,
pruner_db: Option<&Arc<AuthorityPrunerTables>>,
config: AuthorityStorePruningConfig,
metrics: Arc<AuthorityStorePruningMetrics>,
indirect_objects_threshold: usize,
Expand All @@ -339,6 +358,7 @@ impl AuthorityStorePruner {
perpetual_db,
checkpoint_store,
rpc_index,
pruner_db,
PruningMode::Objects,
config.num_epochs_to_retain,
pruned_checkpoint_number,
Expand All @@ -356,6 +376,7 @@ impl AuthorityStorePruner {
checkpoint_store: &Arc<CheckpointStore>,
rpc_index: Option<&RpcIndexStore>,
objects_lock_table: &Arc<RwLockTable<ObjectContentDigest>>,
pruner_db: Option<&Arc<AuthorityPrunerTables>>,
config: AuthorityStorePruningConfig,
metrics: Arc<AuthorityStorePruningMetrics>,
indirect_objects_threshold: usize,
Expand Down Expand Up @@ -397,6 +418,7 @@ impl AuthorityStorePruner {
perpetual_db,
checkpoint_store,
rpc_index,
pruner_db,
PruningMode::Checkpoints,
config
.num_epochs_to_retain_for_checkpoints()
Expand All @@ -416,6 +438,7 @@ impl AuthorityStorePruner {
perpetual_db: &Arc<AuthorityPerpetualTables>,
checkpoint_store: &Arc<CheckpointStore>,
rpc_index: Option<&RpcIndexStore>,
pruner_db: Option<&Arc<AuthorityPrunerTables>>,
mode: PruningMode,
num_epochs_to_retain: u64,
starting_checkpoint_number: CheckpointSequenceNumber,
Expand Down Expand Up @@ -482,6 +505,7 @@ impl AuthorityStorePruner {
effects_to_prune,
perpetual_db,
objects_lock_table,
pruner_db,
checkpoint_number,
metrics.clone(),
indirect_objects_threshold,
Expand Down Expand Up @@ -515,6 +539,7 @@ impl AuthorityStorePruner {
effects_to_prune,
perpetual_db,
objects_lock_table,
pruner_db,
checkpoint_number,
metrics.clone(),
indirect_objects_threshold,
Expand Down Expand Up @@ -625,6 +650,7 @@ impl AuthorityStorePruner {
checkpoint_store: Arc<CheckpointStore>,
rpc_index: Option<Arc<RpcIndexStore>>,
objects_lock_table: Arc<RwLockTable<ObjectContentDigest>>,
pruner_db: Option<Arc<AuthorityPrunerTables>>,
metrics: Arc<AuthorityStorePruningMetrics>,
indirect_objects_threshold: usize,
archive_readers: ArchiveReaderBalancer,
Expand Down Expand Up @@ -685,12 +711,12 @@ impl AuthorityStorePruner {
loop {
tokio::select! {
_ = objects_prune_interval.tick(), if config.num_epochs_to_retain != u64::MAX => {
if let Err(err) = Self::prune_objects_for_eligible_epochs(&perpetual_db, &checkpoint_store, rpc_index.as_deref(), &objects_lock_table, config.clone(), metrics.clone(), indirect_objects_threshold, epoch_duration_ms).await {
if let Err(err) = Self::prune_objects_for_eligible_epochs(&perpetual_db, &checkpoint_store, rpc_index.as_deref(), &objects_lock_table, pruner_db.as_ref(), config.clone(), metrics.clone(), indirect_objects_threshold, epoch_duration_ms).await {
error!("Failed to prune objects: {:?}", err);
}
},
_ = checkpoints_prune_interval.tick(), if !matches!(config.num_epochs_to_retain_for_checkpoints(), None | Some(u64::MAX) | Some(0)) => {
if let Err(err) = Self::prune_checkpoints_for_eligible_epochs(&perpetual_db, &checkpoint_store, rpc_index.as_deref(), &objects_lock_table, config.clone(), metrics.clone(), indirect_objects_threshold, archive_readers.clone(), epoch_duration_ms).await {
if let Err(err) = Self::prune_checkpoints_for_eligible_epochs(&perpetual_db, &checkpoint_store, rpc_index.as_deref(), &objects_lock_table, pruner_db.as_ref(), config.clone(), metrics.clone(), indirect_objects_threshold, archive_readers.clone(), epoch_duration_ms).await {
error!("Failed to prune checkpoints: {:?}", err);
}
},
Expand All @@ -712,6 +738,7 @@ impl AuthorityStorePruner {
registry: &Registry,
indirect_objects_threshold: usize,
archive_readers: ArchiveReaderBalancer,
pruner_db: Option<Arc<AuthorityPrunerTables>>,
) -> Self {
if pruning_config.num_epochs_to_retain > 0 && pruning_config.num_epochs_to_retain < u64::MAX
{
Expand All @@ -731,6 +758,7 @@ impl AuthorityStorePruner {
checkpoint_store,
rpc_index,
objects_lock_table,
pruner_db,
AuthorityStorePruningMetrics::new(registry),
indirect_objects_threshold,
archive_readers,
Expand All @@ -746,6 +774,74 @@ impl AuthorityStorePruner {
}
}

#[derive(Clone)]
pub struct ObjectsCompactionFilter {
db: Weak<AuthorityPrunerTables>,
metrics: Arc<ObjectCompactionMetrics>,
}

impl ObjectsCompactionFilter {
pub fn new(db: Arc<AuthorityPrunerTables>, registry: &Registry) -> Self {
Self {
db: Arc::downgrade(&db),
metrics: ObjectCompactionMetrics::new(registry),
}
}
pub fn filter(&mut self, key: &[u8], value: &[u8]) -> anyhow::Result<Decision> {
let ObjectKey(object_id, version) = bincode::DefaultOptions::new()
.with_big_endian()
.with_fixint_encoding()
.deserialize(key)?;
let object: StoreObjectWrapper = bcs::from_bytes(value)?;
if matches!(object.into_inner(), StoreObject::Value(_)) {
if let Some(db) = self.db.upgrade() {
match db.object_tombstones.get(&object_id)? {
Some(gc_version) => {
if version <= gc_version {
self.metrics.key_removed.inc();
return Ok(Decision::Remove);
}
self.metrics.key_kept.inc();
}
None => self.metrics.key_not_found.inc(),
}
}
}
Ok(Decision::Keep)
}
}

struct ObjectCompactionMetrics {
key_removed: IntCounter,
key_kept: IntCounter,
key_not_found: IntCounter,
}

impl ObjectCompactionMetrics {
pub fn new(registry: &Registry) -> Arc<Self> {
Arc::new(Self {
key_removed: register_int_counter_with_registry!(
"key_removed",
"Compaction key removed",
registry
)
.unwrap(),
key_kept: register_int_counter_with_registry!(
"key_kept",
"Compaction key kept",
registry
)
.unwrap(),
key_not_found: register_int_counter_with_registry!(
"key_not_found",
"Compaction key not found",
registry
)
.unwrap(),
})
}
}

#[cfg(test)]
mod tests {
use more_asserts as ma;
Expand Down Expand Up @@ -921,6 +1017,7 @@ mod tests {
vec![effects],
&db,
&lock_table(),
None,
0,
metrics,
indirect_object_threshold,
Expand Down Expand Up @@ -1047,6 +1144,7 @@ mod tests {
vec![effects],
&perpetual_db,
&lock_table(),
None,
0,
metrics,
0,
Expand Down Expand Up @@ -1169,6 +1267,7 @@ mod pprof_tests {
vec![effects],
&perpetual_db,
&tests::lock_table(),
None,
0,
metrics,
1,
Expand Down Expand Up @@ -1207,6 +1306,7 @@ mod pprof_tests {
vec![effects],
&perpetual_db,
&lock_table(),
None,
0,
metrics,
1,
Expand Down
Loading

0 comments on commit db99597

Please sign in to comment.