diff --git a/crates/sui-indexer/migrations/pg/2024-09-12-213234_watermarks/up.sql b/crates/sui-indexer/migrations/pg/2024-09-12-213234_watermarks/up.sql index 1eaeadb4866f7..e8499c692120c 100644 --- a/crates/sui-indexer/migrations/pg/2024-09-12-213234_watermarks/up.sql +++ b/crates/sui-indexer/migrations/pg/2024-09-12-213234_watermarks/up.sql @@ -4,16 +4,16 @@ CREATE TABLE watermarks entity TEXT NOT NULL, -- Inclusive upper epoch bound for this entity's data. Committer updates this field. Pruner uses -- this to determine if pruning is necessary based on the retention policy. - epoch_hi BIGINT NOT NULL, + epoch_hi_inclusive BIGINT NOT NULL, -- Inclusive lower epoch bound for this entity's data. Pruner updates this field when the epoch range exceeds the retention policy. epoch_lo BIGINT NOT NULL, -- Inclusive upper checkpoint bound for this entity's data. Committer updates this field. All -- data of this entity in the checkpoint must be persisted before advancing this watermark. The -- committer refers to this on disaster recovery to resume writing. - checkpoint_hi BIGINT NOT NULL, + checkpoint_hi_inclusive BIGINT NOT NULL, -- Inclusive upper transaction sequence number bound for this entity's data. Committer updates -- this field. - tx_hi BIGINT NOT NULL, + tx_hi_inclusive BIGINT NOT NULL, -- Inclusive low watermark that the pruner advances. Corresponds to the epoch id, checkpoint -- sequence number, or tx sequence number depending on the entity. Data before this watermark is -- considered pruned by a reader. The underlying data may still exist in the db instance. @@ -22,9 +22,8 @@ CREATE TABLE watermarks -- be dropped. The pruner uses this column to determine whether to prune or wait long enough -- that all in-flight reads complete or timeout before it acts on an updated watermark. timestamp_ms BIGINT NOT NULL, - -- Column used by the pruner to track its true progress. Data at and below this watermark has - -- been truly pruned from the db, and should no longer exist. When recovering from a crash, the - -- pruner will consult this column to determine where to continue. - pruned_lo BIGINT, + -- Column used by the pruner to track its true progress. Data at and below this watermark can + -- be immediately pruned. + pruner_lo BIGINT, PRIMARY KEY (entity) ); diff --git a/crates/sui-indexer/src/handlers/committer.rs b/crates/sui-indexer/src/handlers/committer.rs index de8d89a56ae1f..f67f2fad6f007 100644 --- a/crates/sui-indexer/src/handlers/committer.rs +++ b/crates/sui-indexer/src/handlers/committer.rs @@ -219,17 +219,6 @@ async fn commit_checkpoints( }) .expect("Persisting data into DB should not fail."); - state - .update_watermarks_upper_bound::(committer_watermark) - .await - .tap_err(|e| { - error!( - "Failed to update watermark upper bound with error: {}", - e.to_string() - ); - }) - .expect("Updating watermark upper bound in DB should not fail."); - if is_epoch_end { // The epoch has advanced so we update the configs for the new protocol version, if it has changed. let chain_id = state @@ -242,6 +231,17 @@ async fn commit_checkpoints( .await; } + state + .update_watermarks_upper_bound::(committer_watermark) + .await + .tap_err(|e| { + error!( + "Failed to update watermark upper bound with error: {}", + e.to_string() + ); + }) + .expect("Updating watermark upper bound in DB should not fail."); + let elapsed = guard.stop_and_record(); info!( diff --git a/crates/sui-indexer/src/handlers/mod.rs b/crates/sui-indexer/src/handlers/mod.rs index 3f9d0357c233a..75b64394f1ba3 100644 --- a/crates/sui-indexer/src/handlers/mod.rs +++ b/crates/sui-indexer/src/handlers/mod.rs @@ -195,7 +195,7 @@ impl From<&CheckpointData> for CommitterWatermark { } } -/// Enum representing tables that a committer updates. +/// Enum representing tables that the committer handler writes to. #[derive( Debug, Eq, @@ -253,7 +253,7 @@ pub enum CommitterTables { PrunerCpWatermark, } -/// Enum representing tables that the objects snapshot processor updates. +/// Enum representing tables that the objects snapshot handler writes to. #[derive( Debug, Eq, diff --git a/crates/sui-indexer/src/models/watermarks.rs b/crates/sui-indexer/src/models/watermarks.rs index b4cb8032c7e0d..e3c2395fec636 100644 --- a/crates/sui-indexer/src/models/watermarks.rs +++ b/crates/sui-indexer/src/models/watermarks.rs @@ -15,16 +15,16 @@ pub struct StoredWatermark { pub entity: String, /// Inclusive upper epoch bound for this entity's data. Committer updates this field. Pruner uses /// this to determine if pruning is necessary based on the retention policy. - pub epoch_hi: i64, + pub epoch_hi_inclusive: i64, /// Inclusive lower epoch bound for this entity's data. Pruner updates this field when the epoch range exceeds the retention policy. pub epoch_lo: i64, /// Inclusive upper checkpoint bound for this entity's data. Committer updates this field. All /// data of this entity in the checkpoint must be persisted before advancing this watermark. The /// committer refers to this on disaster recovery to resume writing. - pub checkpoint_hi: i64, + pub checkpoint_hi_inclusive: i64, /// Inclusive upper transaction sequence number bound for this entity's data. Committer updates /// this field. - pub tx_hi: i64, + pub tx_hi_inclusive: i64, /// Inclusive low watermark that the pruner advances. Corresponds to the epoch id, checkpoint /// sequence number, or tx sequence number depending on the entity. Data before this watermark is /// considered pruned by a reader. The underlying data may still exist in the db instance. @@ -33,19 +33,18 @@ pub struct StoredWatermark { /// be dropped. The pruner uses this column to determine whether to prune or wait long enough /// that all in-flight reads complete or timeout before it acts on an updated watermark. pub timestamp_ms: i64, - /// Column used by the pruner to track its true progress. Data at and below this watermark has - /// been truly pruned from the db, and should no longer exist. When recovering from a crash, the - /// pruner will consult this column to determine where to continue. - pub pruned_lo: Option, + /// Column used by the pruner to track its true progress. Data at and below this watermark can + /// be immediately pruned. + pub pruner_lo: Option, } impl StoredWatermark { pub fn from_upper_bound_update(entity: &str, watermark: CommitterWatermark) -> Self { StoredWatermark { entity: entity.to_string(), - epoch_hi: watermark.epoch as i64, - checkpoint_hi: watermark.cp as i64, - tx_hi: watermark.tx as i64, + epoch_hi_inclusive: watermark.epoch as i64, + checkpoint_hi_inclusive: watermark.cp as i64, + tx_hi_inclusive: watermark.tx as i64, ..StoredWatermark::default() } } diff --git a/crates/sui-indexer/src/schema.rs b/crates/sui-indexer/src/schema.rs index 31c67932cd8cd..f56872fc80e90 100644 --- a/crates/sui-indexer/src/schema.rs +++ b/crates/sui-indexer/src/schema.rs @@ -371,13 +371,13 @@ diesel::table! { diesel::table! { watermarks (entity) { entity -> Text, - epoch_hi -> Int8, + epoch_hi_inclusive -> Int8, epoch_lo -> Int8, - checkpoint_hi -> Int8, - tx_hi -> Int8, + checkpoint_hi_inclusive -> Int8, + tx_hi_inclusive -> Int8, reader_lo -> Int8, timestamp_ms -> Int8, - pruned_lo -> Nullable, + pruner_lo -> Nullable, } } diff --git a/crates/sui-indexer/src/store/pg_indexer_store.rs b/crates/sui-indexer/src/store/pg_indexer_store.rs index 2fb57825d4442..5c724e17d9b5c 100644 --- a/crates/sui-indexer/src/store/pg_indexer_store.rs +++ b/crates/sui-indexer/src/store/pg_indexer_store.rs @@ -298,7 +298,7 @@ impl PgIndexerStore { let mut connection = self.pool.get().await?; watermarks::table - .select(watermarks::checkpoint_hi) + .select(watermarks::checkpoint_hi_inclusive) .filter(watermarks::entity.eq("objects_snapshot")) .first::(&mut connection) .await @@ -1604,9 +1604,10 @@ impl PgIndexerStore { .on_conflict(watermarks::entity) .do_update() .set(( - watermarks::epoch_hi.eq(excluded(watermarks::epoch_hi)), - watermarks::checkpoint_hi.eq(excluded(watermarks::checkpoint_hi)), - watermarks::tx_hi.eq(excluded(watermarks::tx_hi)), + watermarks::epoch_hi_inclusive.eq(excluded(watermarks::epoch_hi_inclusive)), + watermarks::checkpoint_hi_inclusive + .eq(excluded(watermarks::checkpoint_hi_inclusive)), + watermarks::tx_hi_inclusive.eq(excluded(watermarks::tx_hi_inclusive)), )) .execute(conn) .await