From 29a920cdbea97e9ac1c694effd3901ce78b44c27 Mon Sep 17 00:00:00 2001 From: Will Yang Date: Fri, 11 Oct 2024 18:12:22 -0700 Subject: [PATCH] will return to this --- crates/sui-indexer/src/handlers/pruner.rs | 159 ++++++------------ crates/sui-indexer/src/models/watermarks.rs | 22 ++- .../sui-indexer/src/store/pg_indexer_store.rs | 2 +- 3 files changed, 67 insertions(+), 116 deletions(-) diff --git a/crates/sui-indexer/src/handlers/pruner.rs b/crates/sui-indexer/src/handlers/pruner.rs index a7330e6b9fb1d1..22cc1e3275ff27 100644 --- a/crates/sui-indexer/src/handlers/pruner.rs +++ b/crates/sui-indexer/src/handlers/pruner.rs @@ -187,95 +187,75 @@ impl Pruner { tokio::time::sleep(Duration::from_millis(watermark.prune_delay(1000))).await; + let Some((mut prune_min, prune_max)) = watermark.prunable_between() else { + continue; + }; + // Prune as an epoch-partitioned table if table_partitions.get(watermark.entity.as_ref()).is_some() { - let mut prune_start = watermark.pruner_lo(); - while prune_start < watermark.epoch_lo { + while prune_min <= prune_max { if cancel.is_cancelled() { info!("Pruner task cancelled."); return Ok(()); } self.partition_manager - .drop_table_partition( - watermark.entity.as_ref().to_string(), - prune_start, - ) + .drop_table_partition(watermark.entity.as_ref().to_string(), prune_min) .await?; info!( "Batch dropped table partition {} epoch {}", - watermark.entity, prune_start + watermark.entity, prune_min ); - prune_start += 1; + prune_min += 1; - // Then need to update the `pruned_lo` + // Then need to update the `pruner_lo` self.store - .update_watermark_latest_pruned(watermark.entity.clone(), prune_start) + .update_watermark_latest_pruned(watermark.entity.clone(), prune_min) .await?; } } else { // Dealing with an unpartitioned table - if watermark.is_prunable() { - match watermark.entity { - PrunableTable::ObjectsHistory - | PrunableTable::Transactions - | PrunableTable::Events => {} - PrunableTable::EventEmitPackage - | PrunableTable::EventEmitModule - | PrunableTable::EventSenders - | PrunableTable::EventStructInstantiation - | PrunableTable::EventStructModule - | PrunableTable::EventStructName - | PrunableTable::EventStructPackage => { - self.store - .prune_event_indices_table( - watermark.pruner_lo(), - watermark.reader_lo - 1, - ) - .await?; - } - PrunableTable::TxAffectedAddresses - | PrunableTable::TxAffectedObjects - | PrunableTable::TxCallsPkg - | PrunableTable::TxCallsMod - | PrunableTable::TxCallsFun - | PrunableTable::TxChangedObjects - | PrunableTable::TxDigests - | PrunableTable::TxInputObjects - | PrunableTable::TxKinds - | PrunableTable::TxRecipients - | PrunableTable::TxSenders => { - self.store - .prune_tx_indices_table( - watermark.pruner_lo(), - watermark.reader_lo - 1, - ) - .await?; - } - PrunableTable::Checkpoints => { - self.store - .prune_cp_tx_table( - watermark.pruner_lo(), - watermark.reader_lo - 1, - ) - .await?; - } - PrunableTable::PrunerCpWatermark => { - self.store - .prune_cp_tx_table( - watermark.pruner_lo(), - watermark.reader_lo - 1, - ) - .await?; - } - } - self.store - .update_watermark_latest_pruned( - watermark.entity.clone(), - watermark.reader_lo - 1, - ) - .await?; + match watermark.entity { + PrunableTable::ObjectsHistory + | PrunableTable::Transactions + | PrunableTable::Events => {} + PrunableTable::EventEmitPackage + | PrunableTable::EventEmitModule + | PrunableTable::EventSenders + | PrunableTable::EventStructInstantiation + | PrunableTable::EventStructModule + | PrunableTable::EventStructName + | PrunableTable::EventStructPackage => { + self.store + .prune_event_indices_table(prune_min, prune_max) + .await?; + } + PrunableTable::TxAffectedAddresses + | PrunableTable::TxAffectedObjects + | PrunableTable::TxCallsPkg + | PrunableTable::TxCallsMod + | PrunableTable::TxCallsFun + | PrunableTable::TxChangedObjects + | PrunableTable::TxDigests + | PrunableTable::TxInputObjects + | PrunableTable::TxKinds + | PrunableTable::TxRecipients + | PrunableTable::TxSenders => { + self.store + .prune_tx_indices_table(prune_min, prune_max) + .await?; + } + PrunableTable::Checkpoints => { + self.store.prune_cp_tx_table(prune_min, prune_max).await?; + } + PrunableTable::PrunerCpWatermark => { + self.store.prune_cp_tx_table(prune_min, prune_max).await?; + } } + + self.store + .update_watermark_latest_pruned(watermark.entity.clone(), prune_max) + .await?; } } } @@ -359,45 +339,6 @@ async fn prune_table(store: &PgIndexerStore, watermark: &PrunableWatermark) -> I }; // prune up to and including pruner_lo - match watermark.entity { - PrunableTable::ObjectsHistory | PrunableTable::Transactions | PrunableTable::Events => {} - PrunableTable::EventEmitPackage - | PrunableTable::EventEmitModule - | PrunableTable::EventSenders - | PrunableTable::EventStructInstantiation - | PrunableTable::EventStructModule - | PrunableTable::EventStructName - | PrunableTable::EventStructPackage => { - store - .prune_event_indices_table(watermark.pruner_lo(), pruner_hi_inclusive) - .await?; - } - PrunableTable::TxAffectedAddresses - | PrunableTable::TxAffectedObjects - | PrunableTable::TxCallsPkg - | PrunableTable::TxCallsMod - | PrunableTable::TxCallsFun - | PrunableTable::TxChangedObjects - | PrunableTable::TxDigests - | PrunableTable::TxInputObjects - | PrunableTable::TxKinds - | PrunableTable::TxRecipients - | PrunableTable::TxSenders => { - store - .prune_tx_indices_table(watermark.pruner_lo(), pruner_hi_inclusive) - .await?; - } - PrunableTable::Checkpoints => { - store - .prune_cp_tx_table(watermark.pruner_lo(), pruner_hi_inclusive) - .await?; - } - PrunableTable::PrunerCpWatermark => { - store - .prune_cp_tx_table(watermark.pruner_lo(), pruner_hi_inclusive) - .await?; - } - } // and then update pruner_lo to reader_lo - 1 store diff --git a/crates/sui-indexer/src/models/watermarks.rs b/crates/sui-indexer/src/models/watermarks.rs index 290005bea6b021..badd1e8ee68859 100644 --- a/crates/sui-indexer/src/models/watermarks.rs +++ b/crates/sui-indexer/src/models/watermarks.rs @@ -53,7 +53,7 @@ pub struct PrunableWatermark { pub timestamp_ms: i64, /// Latest timestamp read from db. pub current_timestamp_ms: i64, - /// Data at and below `pruned_lo` is considered pruned by the pruner. + /// Data at and below `pruner_lo` is considered pruned by the pruner. pub pruner_lo: Option, } @@ -105,11 +105,21 @@ impl PrunableWatermark { (self.timestamp_ms + delay as i64 - self.current_timestamp_ms).max(0) as u64 } - /// Check if unpartitioned table is prunable - pub fn is_prunable(&self) -> bool { - match self.pruned_lo { - None => self.reader_lo > 0, - Some(pruned_lo) => self.reader_lo > pruned_lo + 1, + pub fn prunable_between(&self) -> Option<(u64, u64)> { + if self.pruner_hi() <= 0 { + return None; + } + + let prune_max = self.pruner_hi() - 1; + + let Some(prune_min) = self.pruner_lo() else { + return Some((0, prune_max)); + }; + + if prune_min < prune_max { + Some((prune_min, prune_max)) + } else { + None } } } diff --git a/crates/sui-indexer/src/store/pg_indexer_store.rs b/crates/sui-indexer/src/store/pg_indexer_store.rs index 7de7953e27dd26..e4103e7ca3554e 100644 --- a/crates/sui-indexer/src/store/pg_indexer_store.rs +++ b/crates/sui-indexer/src/store/pg_indexer_store.rs @@ -1761,7 +1761,7 @@ impl PgIndexerStore { async { diesel::update(watermarks::table) .filter(watermarks::entity.eq(table.as_ref())) - .set((watermarks::pruned_lo.eq(latest_pruned as i64),)) + .set((watermarks::pruner_lo.eq(latest_pruned as i64),)) .execute(conn) .await?;