Skip to content

Commit

Permalink
will return to this
Browse files Browse the repository at this point in the history
  • Loading branch information
wlmyng committed Oct 15, 2024
1 parent bc0502d commit 29a920c
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 116 deletions.
159 changes: 50 additions & 109 deletions crates/sui-indexer/src/handlers/pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,95 +187,75 @@ impl Pruner {

tokio::time::sleep(Duration::from_millis(watermark.prune_delay(1000))).await;

let Some((mut prune_min, prune_max)) = watermark.prunable_between() else {
continue;
};

// Prune as an epoch-partitioned table
if table_partitions.get(watermark.entity.as_ref()).is_some() {
let mut prune_start = watermark.pruner_lo();
while prune_start < watermark.epoch_lo {
while prune_min <= prune_max {
if cancel.is_cancelled() {
info!("Pruner task cancelled.");
return Ok(());
}
self.partition_manager
.drop_table_partition(
watermark.entity.as_ref().to_string(),
prune_start,
)
.drop_table_partition(watermark.entity.as_ref().to_string(), prune_min)
.await?;
info!(
"Batch dropped table partition {} epoch {}",
watermark.entity, prune_start
watermark.entity, prune_min
);
prune_start += 1;
prune_min += 1;

// Then need to update the `pruned_lo`
// Then need to update the `pruner_lo`
self.store
.update_watermark_latest_pruned(watermark.entity.clone(), prune_start)
.update_watermark_latest_pruned(watermark.entity.clone(), prune_min)
.await?;
}
} else {
// Dealing with an unpartitioned table
if watermark.is_prunable() {
match watermark.entity {
PrunableTable::ObjectsHistory
| PrunableTable::Transactions
| PrunableTable::Events => {}
PrunableTable::EventEmitPackage
| PrunableTable::EventEmitModule
| PrunableTable::EventSenders
| PrunableTable::EventStructInstantiation
| PrunableTable::EventStructModule
| PrunableTable::EventStructName
| PrunableTable::EventStructPackage => {
self.store
.prune_event_indices_table(
watermark.pruner_lo(),
watermark.reader_lo - 1,
)
.await?;
}
PrunableTable::TxAffectedAddresses
| PrunableTable::TxAffectedObjects
| PrunableTable::TxCallsPkg
| PrunableTable::TxCallsMod
| PrunableTable::TxCallsFun
| PrunableTable::TxChangedObjects
| PrunableTable::TxDigests
| PrunableTable::TxInputObjects
| PrunableTable::TxKinds
| PrunableTable::TxRecipients
| PrunableTable::TxSenders => {
self.store
.prune_tx_indices_table(
watermark.pruner_lo(),
watermark.reader_lo - 1,
)
.await?;
}
PrunableTable::Checkpoints => {
self.store
.prune_cp_tx_table(
watermark.pruner_lo(),
watermark.reader_lo - 1,
)
.await?;
}
PrunableTable::PrunerCpWatermark => {
self.store
.prune_cp_tx_table(
watermark.pruner_lo(),
watermark.reader_lo - 1,
)
.await?;
}
}

self.store
.update_watermark_latest_pruned(
watermark.entity.clone(),
watermark.reader_lo - 1,
)
.await?;
match watermark.entity {
PrunableTable::ObjectsHistory
| PrunableTable::Transactions
| PrunableTable::Events => {}
PrunableTable::EventEmitPackage
| PrunableTable::EventEmitModule
| PrunableTable::EventSenders
| PrunableTable::EventStructInstantiation
| PrunableTable::EventStructModule
| PrunableTable::EventStructName
| PrunableTable::EventStructPackage => {
self.store
.prune_event_indices_table(prune_min, prune_max)
.await?;
}
PrunableTable::TxAffectedAddresses
| PrunableTable::TxAffectedObjects
| PrunableTable::TxCallsPkg
| PrunableTable::TxCallsMod
| PrunableTable::TxCallsFun
| PrunableTable::TxChangedObjects
| PrunableTable::TxDigests
| PrunableTable::TxInputObjects
| PrunableTable::TxKinds
| PrunableTable::TxRecipients
| PrunableTable::TxSenders => {
self.store
.prune_tx_indices_table(prune_min, prune_max)
.await?;
}
PrunableTable::Checkpoints => {
self.store.prune_cp_tx_table(prune_min, prune_max).await?;
}
PrunableTable::PrunerCpWatermark => {
self.store.prune_cp_tx_table(prune_min, prune_max).await?;
}
}

self.store
.update_watermark_latest_pruned(watermark.entity.clone(), prune_max)
.await?;
}
}
}
Expand Down Expand Up @@ -359,45 +339,6 @@ async fn prune_table(store: &PgIndexerStore, watermark: &PrunableWatermark) -> I
};

// prune up to and including pruner_lo
match watermark.entity {
PrunableTable::ObjectsHistory | PrunableTable::Transactions | PrunableTable::Events => {}
PrunableTable::EventEmitPackage
| PrunableTable::EventEmitModule
| PrunableTable::EventSenders
| PrunableTable::EventStructInstantiation
| PrunableTable::EventStructModule
| PrunableTable::EventStructName
| PrunableTable::EventStructPackage => {
store
.prune_event_indices_table(watermark.pruner_lo(), pruner_hi_inclusive)
.await?;
}
PrunableTable::TxAffectedAddresses
| PrunableTable::TxAffectedObjects
| PrunableTable::TxCallsPkg
| PrunableTable::TxCallsMod
| PrunableTable::TxCallsFun
| PrunableTable::TxChangedObjects
| PrunableTable::TxDigests
| PrunableTable::TxInputObjects
| PrunableTable::TxKinds
| PrunableTable::TxRecipients
| PrunableTable::TxSenders => {
store
.prune_tx_indices_table(watermark.pruner_lo(), pruner_hi_inclusive)
.await?;
}
PrunableTable::Checkpoints => {
store
.prune_cp_tx_table(watermark.pruner_lo(), pruner_hi_inclusive)
.await?;
}
PrunableTable::PrunerCpWatermark => {
store
.prune_cp_tx_table(watermark.pruner_lo(), pruner_hi_inclusive)
.await?;
}
}

// and then update pruner_lo to reader_lo - 1
store
Expand Down
22 changes: 16 additions & 6 deletions crates/sui-indexer/src/models/watermarks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub struct PrunableWatermark {
pub timestamp_ms: i64,
/// Latest timestamp read from db.
pub current_timestamp_ms: i64,
/// Data at and below `pruned_lo` is considered pruned by the pruner.
/// Data at and below `pruner_lo` is considered pruned by the pruner.
pub pruner_lo: Option<u64>,
}

Expand Down Expand Up @@ -105,11 +105,21 @@ impl PrunableWatermark {
(self.timestamp_ms + delay as i64 - self.current_timestamp_ms).max(0) as u64
}

/// Check if unpartitioned table is prunable
pub fn is_prunable(&self) -> bool {
match self.pruned_lo {
None => self.reader_lo > 0,
Some(pruned_lo) => self.reader_lo > pruned_lo + 1,
pub fn prunable_between(&self) -> Option<(u64, u64)> {
if self.pruner_hi() <= 0 {
return None;
}

let prune_max = self.pruner_hi() - 1;

let Some(prune_min) = self.pruner_lo() else {
return Some((0, prune_max));
};

if prune_min < prune_max {
Some((prune_min, prune_max))
} else {
None
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-indexer/src/store/pg_indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1761,7 +1761,7 @@ impl PgIndexerStore {
async {
diesel::update(watermarks::table)
.filter(watermarks::entity.eq(table.as_ref()))
.set((watermarks::pruned_lo.eq(latest_pruned as i64),))
.set((watermarks::pruner_lo.eq(latest_pruned as i64),))
.execute(conn)
.await?;

Expand Down

0 comments on commit 29a920c

Please sign in to comment.