diff --git a/crates/sui-indexer/migrations/pg/2024-09-12-213234_watermarks/up.sql b/crates/sui-indexer/migrations/pg/2024-09-12-213234_watermarks/up.sql index 73d7a4dcf16f52..57dd6e130954de 100644 --- a/crates/sui-indexer/migrations/pg/2024-09-12-213234_watermarks/up.sql +++ b/crates/sui-indexer/migrations/pg/2024-09-12-213234_watermarks/up.sql @@ -22,8 +22,5 @@ CREATE TABLE watermarks -- be dropped. The pruner uses this column to determine whether to prune or wait long enough -- that all in-flight reads complete or timeout before it acts on an updated watermark. timestamp_ms BIGINT NOT NULL, - -- Updated and used by the pruner. Data up to and excluding this watermark can be immediately - -- dropped. Data between this and `reader_lo` can be pruned after a delay. - pruner_lo BIGINT, PRIMARY KEY (entity) ); diff --git a/crates/sui-indexer/src/handlers/pruner.rs b/crates/sui-indexer/src/handlers/pruner.rs index d298962af93cd9..85b6faa12f071d 100644 --- a/crates/sui-indexer/src/handlers/pruner.rs +++ b/crates/sui-indexer/src/handlers/pruner.rs @@ -11,7 +11,6 @@ use tracing::{error, info}; use crate::config::RetentionConfig; use crate::errors::IndexerError; -use crate::models::watermarks::PrunableWatermark; use crate::store::pg_partition_manager::PgPartitionManager; use crate::store::PgIndexerStore; use crate::{metrics::IndexerMetrics, store::IndexerStore, types::IndexerResult}; @@ -104,15 +103,6 @@ impl PrunableTable { PrunableTable::PrunerCpWatermark => cp, } } - - pub fn select_pruner_lo(&self, epoch_lo: u64, reader_lo: u64) -> u64 { - match self { - PrunableTable::ObjectsHistory => epoch_lo, - PrunableTable::Transactions => epoch_lo, - PrunableTable::Events => epoch_lo, - _ => reader_lo, - } - } } impl Pruner { @@ -261,7 +251,7 @@ async fn update_watermarks_lower_bounds( retention_policies: &HashMap, cancel: &CancellationToken, ) -> IndexerResult<()> { - let (watermarks, latest_db_timestamp) = store.get_watermarks().await?; + let (watermarks, _) = store.get_watermarks().await?; let mut lower_bound_updates = vec![]; for watermark in watermarks.iter() { @@ -270,21 +260,21 @@ async fn update_watermarks_lower_bounds( return Ok(()); } - let Some(watermark) = PrunableWatermark::new(watermark.clone(), latest_db_timestamp) else { + let Some(prunable_table) = watermark.entity() else { continue; }; - let Some(epochs_to_keep) = retention_policies.get(&watermark.entity) else { + let Some(epochs_to_keep) = retention_policies.get(&prunable_table) else { + error!( + "No retention policy found for prunable table {}", + prunable_table + ); continue; }; - if watermark.epoch_lo + epochs_to_keep <= watermark.epoch_hi_inclusive { - let new_epoch_lo = watermark - .epoch_hi_inclusive - .saturating_sub(epochs_to_keep - 1); - - lower_bound_updates.push((watermark, new_epoch_lo)); - } + if let Some(new_epoch_lo) = watermark.new_epoch_lo(*epochs_to_keep) { + lower_bound_updates.push((prunable_table, new_epoch_lo)); + }; } if !lower_bound_updates.is_empty() { diff --git a/crates/sui-indexer/src/models/watermarks.rs b/crates/sui-indexer/src/models/watermarks.rs index 06734371d15dcd..e3e815a16a3d42 100644 --- a/crates/sui-indexer/src/models/watermarks.rs +++ b/crates/sui-indexer/src/models/watermarks.rs @@ -36,70 +36,6 @@ pub struct StoredWatermark { /// be dropped. The pruner uses this column to determine whether to prune or wait long enough /// that all in-flight reads complete or timeout before it acts on an updated watermark. pub timestamp_ms: i64, - /// Updated and used by the pruner. Data up to and excluding this watermark can be immediately - /// dropped. Data between this and `reader_lo` can be pruned after a delay. - pub pruner_lo: Option, -} - -#[derive(Debug)] -pub struct PrunableWatermark { - pub entity: PrunableTable, - pub epoch_hi_inclusive: u64, - pub epoch_lo: u64, - pub checkpoint_hi_inclusive: u64, - pub tx_hi_inclusive: u64, - pub reader_lo: u64, - /// Timestamp when the watermark's lower bound was last updated. - pub timestamp_ms: i64, - /// Latest timestamp read from db. - pub current_timestamp_ms: i64, - /// Data at and below `pruned_lo` is considered pruned by the pruner. - pub pruner_lo: Option, -} - -impl PrunableWatermark { - pub fn new(stored: StoredWatermark, latest_db_timestamp: i64) -> Option { - let entity = PrunableTable::from_str(&stored.entity).ok()?; - - Some(PrunableWatermark { - entity, - epoch_hi_inclusive: stored.epoch_hi_inclusive as u64, - epoch_lo: stored.epoch_lo as u64, - checkpoint_hi_inclusive: stored.checkpoint_hi_inclusive as u64, - tx_hi_inclusive: stored.tx_hi_inclusive as u64, - reader_lo: stored.reader_lo as u64, - timestamp_ms: stored.timestamp_ms, - current_timestamp_ms: latest_db_timestamp, - pruner_lo: stored.pruner_lo.map(|lo| lo as u64), - }) - } - - pub fn update(&mut self, new_epoch_lo: u64, new_reader_lo: u64) { - self.pruner_lo = Some(match self.entity { - PrunableTable::ObjectsHistory => self.epoch_lo, - PrunableTable::Transactions => self.epoch_lo, - PrunableTable::Events => self.epoch_lo, - _ => self.reader_lo, - }); - - self.epoch_lo = new_epoch_lo; - self.reader_lo = new_reader_lo; - } - - /// Represents the exclusive upper bound of data that can be pruned immediately. - pub fn immediately_prunable_hi(&self) -> Option { - self.pruner_lo - } - - /// Represents the lower bound of data that can be pruned after a delay. - pub fn delayed_prunable_lo(&self) -> Option { - self.pruner_lo - } - - /// The new `pruner_lo` is the current reader_lo, or epoch_lo for epoch-partitioned tables. - pub fn new_pruner_lo(&self) -> u64 { - self.entity.select_pruner_lo(self.epoch_lo, self.reader_lo) - } } impl StoredWatermark { @@ -113,18 +49,25 @@ impl StoredWatermark { } } - pub fn from_lower_bound_update( - entity: &str, - epoch_lo: u64, - reader_lo: u64, - pruner_lo: u64, - ) -> Self { + pub fn from_lower_bound_update(entity: &str, epoch_lo: u64, reader_lo: u64) -> Self { StoredWatermark { entity: entity.to_string(), epoch_lo: epoch_lo as i64, reader_lo: reader_lo as i64, - pruner_lo: Some(pruner_lo as i64), ..StoredWatermark::default() } } + + pub fn entity(&self) -> Option { + PrunableTable::from_str(&self.entity).ok() + } + + /// Determine whether to set a new epoch lower bound based on the retention policy. + pub fn new_epoch_lo(&self, retention: u64) -> Option { + if self.epoch_lo as u64 + retention <= self.epoch_hi_inclusive as u64 { + Some((self.epoch_hi_inclusive as u64).saturating_sub(retention - 1)) + } else { + None + } + } } diff --git a/crates/sui-indexer/src/schema.rs b/crates/sui-indexer/src/schema.rs index a2c418db8e042a..7f5260c53eb5f3 100644 --- a/crates/sui-indexer/src/schema.rs +++ b/crates/sui-indexer/src/schema.rs @@ -378,7 +378,6 @@ diesel::table! { tx_hi_inclusive -> Int8, reader_lo -> Int8, timestamp_ms -> Int8, - pruner_lo -> Nullable, } } diff --git a/crates/sui-indexer/src/store/indexer_store.rs b/crates/sui-indexer/src/store/indexer_store.rs index 682a0a2fc84680..998b37f286b1e5 100644 --- a/crates/sui-indexer/src/store/indexer_store.rs +++ b/crates/sui-indexer/src/store/indexer_store.rs @@ -7,12 +7,13 @@ use async_trait::async_trait; use strum::IntoEnumIterator; use crate::errors::IndexerError; +use crate::handlers::pruner::PrunableTable; use crate::handlers::{CommitterWatermark, EpochToCommit, TransactionObjectChangesToCommit}; use crate::models::display::StoredDisplay; use crate::models::obj_indices::StoredObjectVersion; use crate::models::objects::{StoredDeletedObject, StoredObject}; use crate::models::raw_checkpoints::StoredRawCheckpoint; -use crate::models::watermarks::{PrunableWatermark, StoredWatermark}; +use crate::models::watermarks::StoredWatermark; use crate::types::{ EventIndex, IndexedCheckpoint, IndexedEvent, IndexedPackage, IndexedTransaction, TxIndex, }; @@ -131,7 +132,7 @@ pub trait IndexerStore: Clone + Sync + Send + 'static { /// bounds. async fn update_watermarks_lower_bound( &self, - watermarks: Vec<(PrunableWatermark, u64)>, + watermarks: Vec<(PrunableTable, u64)>, ) -> Result<(), IndexerError>; /// Load all watermark entries from the store, and the latest timestamp from the db. diff --git a/crates/sui-indexer/src/store/pg_indexer_store.rs b/crates/sui-indexer/src/store/pg_indexer_store.rs index bef9001dda9cdb..aa9c98eb59d2cb 100644 --- a/crates/sui-indexer/src/store/pg_indexer_store.rs +++ b/crates/sui-indexer/src/store/pg_indexer_store.rs @@ -28,6 +28,7 @@ use sui_storage::object_store::util::put; use crate::config::UploadOptions; use crate::database::ConnectionPool; use crate::errors::{Context, IndexerError}; +use crate::handlers::pruner::PrunableTable; use crate::handlers::TransactionObjectChangesToCommit; use crate::handlers::{CommitterWatermark, EpochToCommit}; use crate::metrics::IndexerMetrics; @@ -45,7 +46,7 @@ use crate::models::objects::{ }; use crate::models::packages::StoredPackage; use crate::models::transactions::StoredTransaction; -use crate::models::watermarks::{PrunableWatermark, StoredWatermark}; +use crate::models::watermarks::StoredWatermark; use crate::schema::{ chain_identifier, checkpoints, display, epochs, event_emit_module, event_emit_package, event_senders, event_struct_instantiation, event_struct_module, event_struct_name, @@ -1584,7 +1585,7 @@ impl PgIndexerStore { async fn update_watermarks_lower_bound( &self, - watermarks: Vec<(PrunableWatermark, u64)>, + watermarks: Vec<(PrunableTable, u64)>, ) -> Result<(), IndexerError> { use diesel_async::RunQueryDsl; @@ -1592,7 +1593,7 @@ impl PgIndexerStore { let epoch_mapping = self.map_epochs_to_cp_tx(&epochs).await?; let lookups: Result, IndexerError> = watermarks .into_iter() - .map(|(watermark, epoch)| { + .map(|(table, epoch)| { let (checkpoint, tx) = epoch_mapping.get(&epoch).ok_or_else(|| { IndexerError::PersistentStorageDataCorruptionError(format!( "Epoch {} not found in epoch mapping", @@ -1601,10 +1602,9 @@ impl PgIndexerStore { })?; Ok(StoredWatermark::from_lower_bound_update( - watermark.entity.as_ref(), + table.as_ref(), epoch, - watermark.entity.select_reader_lo(*checkpoint, *tx), - watermark.new_pruner_lo(), + table.select_reader_lo(*checkpoint, *tx), )) }) .collect(); @@ -1627,7 +1627,6 @@ impl PgIndexerStore { .set(( watermarks::reader_lo.eq(excluded(watermarks::reader_lo)), watermarks::epoch_lo.eq(excluded(watermarks::epoch_lo)), - watermarks::pruner_lo.eq(excluded(watermarks::pruner_lo)), watermarks::timestamp_ms.eq(sql::( "(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000)::bigint", )), @@ -2387,7 +2386,7 @@ impl IndexerStore for PgIndexerStore { async fn update_watermarks_lower_bound( &self, - watermarks: Vec<(PrunableWatermark, u64)>, + watermarks: Vec<(PrunableTable, u64)>, ) -> Result<(), IndexerError> { self.update_watermarks_lower_bound(watermarks).await }