diff --git a/crates/sui-indexer/migrations/pg/2024-09-12-213234_watermarks/up.sql b/crates/sui-indexer/migrations/pg/2024-09-12-213234_watermarks/up.sql index 2187cadc26f32..73bdc70055246 100644 --- a/crates/sui-indexer/migrations/pg/2024-09-12-213234_watermarks/up.sql +++ b/crates/sui-indexer/migrations/pg/2024-09-12-213234_watermarks/up.sql @@ -1,29 +1,34 @@ -CREATE TABLE watermarks +CREATE TABLE IF NOT EXISTS watermarks ( - -- The table governed by this watermark, i.e `epochs`, `checkpoints`, `transactions`. - entity TEXT NOT NULL, - -- Inclusive upper epoch bound for this entity's data. Committer updates this field. Pruner uses - -- this to determine if pruning is necessary based on the retention policy. + -- The pipeline governed by this watermark, i.e `epochs`, `checkpoints`, + -- `transactions`. + pipeline TEXT PRIMARY KEY, + -- Inclusive upper epoch bound for this entity's data. Committer updates + -- this field. Pruner uses this to determine if pruning is necessary based + -- on the retention policy. epoch_hi_inclusive BIGINT NOT NULL, - -- Inclusive lower epoch bound for this entity's data. Pruner updates this field when the epoch range exceeds the retention policy. - epoch_lo BIGINT NOT NULL, - -- Inclusive upper checkpoint bound for this entity's data. Committer updates this field. All - -- data of this entity in the checkpoint must be persisted before advancing this watermark. The - -- committer refers to this on disaster recovery to resume writing. + -- Inclusive upper checkpoint bound for this entity's data. Committer + -- updates this field. All data of this entity in the checkpoint must be + -- persisted before advancing this watermark. The committer refers to this + -- on disaster recovery to resume writing. checkpoint_hi_inclusive BIGINT NOT NULL, - -- Inclusive upper transaction sequence number bound for this entity's data. Committer updates - -- this field. - tx_hi_inclusive BIGINT NOT NULL, - -- Inclusive low watermark that the pruner advances. Corresponds to the epoch id, checkpoint - -- sequence number, or tx sequence number depending on the entity. Data before this watermark is - -- considered pruned by a reader. The underlying data may still exist in the db instance. + -- Exclusive upper transaction sequence number bound for this entity's + -- data. Committer updates this field. + tx_hi BIGINT NOT NULL, + -- Inclusive lower epoch bound for this entity's data. Pruner updates this + -- field when the epoch range exceeds the retention policy. + epoch_lo BIGINT NOT NULL, + -- Inclusive low watermark that the pruner advances. Corresponds to the + -- epoch id, checkpoint sequence number, or tx sequence number depending on + -- the entity. Data before this watermark is considered pruned by a reader. + -- The underlying data may still exist in the db instance. reader_lo BIGINT NOT NULL, - -- Updated using the database's current timestamp when the pruner sees that some data needs to - -- be dropped. The pruner uses this column to determine whether to prune or wait long enough - -- that all in-flight reads complete or timeout before it acts on an updated watermark. + -- Updated using the database's current timestamp when the pruner sees that + -- some data needs to be dropped. The pruner uses this column to determine + -- whether to prune or wait long enough that all in-flight reads complete + -- or timeout before it acts on an updated watermark. timestamp_ms BIGINT NOT NULL, - -- Column used by the pruner to track its true progress. Data at and below this watermark can - -- be immediately pruned. - pruner_hi_inclusive BIGINT, - PRIMARY KEY (entity) + -- Column used by the pruner to track its true progress. Data below this + -- watermark can be immediately pruned. + pruner_hi BIGINT NOT NULL ); diff --git a/crates/sui-indexer/src/handlers/committer.rs b/crates/sui-indexer/src/handlers/committer.rs index 6c08fda947667..4d5ad05d731c5 100644 --- a/crates/sui-indexer/src/handlers/committer.rs +++ b/crates/sui-indexer/src/handlers/committer.rs @@ -264,19 +264,20 @@ async fn commit_checkpoints( elapsed, "Checkpoint {}-{} committed with {} transactions.", first_checkpoint_seq, - committer_watermark.cp, + committer_watermark.checkpoint_hi_inclusive, tx_count, ); metrics .latest_tx_checkpoint_sequence_number - .set(committer_watermark.cp as i64); + .set(committer_watermark.checkpoint_hi_inclusive as i64); metrics .total_tx_checkpoint_committed .inc_by(checkpoint_num as u64); metrics.total_transaction_committed.inc_by(tx_count as u64); - metrics - .transaction_per_checkpoint - .observe(tx_count as f64 / (committer_watermark.cp - first_checkpoint_seq + 1) as f64); + metrics.transaction_per_checkpoint.observe( + tx_count as f64 + / (committer_watermark.checkpoint_hi_inclusive - first_checkpoint_seq + 1) as f64, + ); // 1000.0 is not necessarily the batch size, it's to roughly map average tx commit latency to [0.1, 1] seconds, // which is well covered by DB_COMMIT_LATENCY_SEC_BUCKETS. metrics diff --git a/crates/sui-indexer/src/handlers/mod.rs b/crates/sui-indexer/src/handlers/mod.rs index c36b3b4d13f4a..a6c6412f3a42c 100644 --- a/crates/sui-indexer/src/handlers/mod.rs +++ b/crates/sui-indexer/src/handlers/mod.rs @@ -130,7 +130,7 @@ impl CommonHandler { return Ok(()); } for tuple in tuple_chunk { - unprocessed.insert(tuple.0.cp, tuple); + unprocessed.insert(tuple.0.checkpoint_hi_inclusive, tuple); } } Some(None) => break, // Stream has ended @@ -200,17 +200,17 @@ pub trait Handler: Send + Sync { /// will be used for a particular table. #[derive(Clone, Copy, Ord, PartialOrd, Eq, PartialEq)] pub struct CommitterWatermark { - pub epoch: u64, - pub cp: u64, - pub tx: u64, + pub epoch_hi_inclusive: u64, + pub checkpoint_hi_inclusive: u64, + pub tx_hi: u64, } impl From<&IndexedCheckpoint> for CommitterWatermark { fn from(checkpoint: &IndexedCheckpoint) -> Self { Self { - epoch: checkpoint.epoch, - cp: checkpoint.sequence_number, - tx: checkpoint.network_total_transactions.saturating_sub(1), + epoch_hi_inclusive: checkpoint.epoch, + checkpoint_hi_inclusive: checkpoint.sequence_number, + tx_hi: checkpoint.network_total_transactions, } } } @@ -218,12 +218,9 @@ impl From<&IndexedCheckpoint> for CommitterWatermark { impl From<&CheckpointData> for CommitterWatermark { fn from(checkpoint: &CheckpointData) -> Self { Self { - epoch: checkpoint.checkpoint_summary.epoch, - cp: checkpoint.checkpoint_summary.sequence_number, - tx: checkpoint - .checkpoint_summary - .network_total_transactions - .saturating_sub(1), + epoch_hi_inclusive: checkpoint.checkpoint_summary.epoch, + checkpoint_hi_inclusive: checkpoint.checkpoint_summary.sequence_number, + tx_hi: checkpoint.checkpoint_summary.network_total_transactions, } } } diff --git a/crates/sui-indexer/src/handlers/objects_snapshot_handler.rs b/crates/sui-indexer/src/handlers/objects_snapshot_handler.rs index 960f057b54ddd..816b416fc3743 100644 --- a/crates/sui-indexer/src/handlers/objects_snapshot_handler.rs +++ b/crates/sui-indexer/src/handlers/objects_snapshot_handler.rs @@ -73,7 +73,7 @@ impl Handler for ObjectsSnapshotHandler { self.metrics .latest_object_snapshot_sequence_number - .set(watermark.cp as i64); + .set(watermark.checkpoint_hi_inclusive as i64); Ok(()) } diff --git a/crates/sui-indexer/src/models/watermarks.rs b/crates/sui-indexer/src/models/watermarks.rs index 79bd014c9a500..1ff3d3cfe52ac 100644 --- a/crates/sui-indexer/src/models/watermarks.rs +++ b/crates/sui-indexer/src/models/watermarks.rs @@ -15,19 +15,19 @@ use crate::{ #[diesel(table_name = watermarks, primary_key(entity))] pub struct StoredWatermark { /// The table governed by this watermark, i.e `epochs`, `checkpoints`, `transactions`. - pub entity: String, + pub pipeline: String, /// Inclusive upper epoch bound for this entity's data. Committer updates this field. Pruner uses /// this to determine if pruning is necessary based on the retention policy. pub epoch_hi_inclusive: i64, - /// Inclusive lower epoch bound for this entity's data. Pruner updates this field when the epoch range exceeds the retention policy. - pub epoch_lo: i64, /// Inclusive upper checkpoint bound for this entity's data. Committer updates this field. All /// data of this entity in the checkpoint must be persisted before advancing this watermark. The /// committer refers to this on disaster recovery to resume writing. pub checkpoint_hi_inclusive: i64, - /// Inclusive upper transaction sequence number bound for this entity's data. Committer updates + /// Exclusive upper transaction sequence number bound for this entity's data. Committer updates /// this field. - pub tx_hi_inclusive: i64, + pub tx_hi: i64, + /// Inclusive lower epoch bound for this entity's data. Pruner updates this field when the epoch range exceeds the retention policy. + pub epoch_lo: i64, /// Inclusive low watermark that the pruner advances. Corresponds to the epoch id, checkpoint /// sequence number, or tx sequence number depending on the entity. Data before this watermark is /// considered pruned by a reader. The underlying data may still exist in the db instance. @@ -36,25 +36,25 @@ pub struct StoredWatermark { /// be dropped. The pruner uses this column to determine whether to prune or wait long enough /// that all in-flight reads complete or timeout before it acts on an updated watermark. pub timestamp_ms: i64, - /// Column used by the pruner to track its true progress. Data at and below this watermark can - /// be immediately pruned. - pub pruner_hi_inclusive: Option, + /// Column used by the pruner to track its true progress. Data below this watermark can be + /// immediately pruned. + pub pruner_hi: i64, } impl StoredWatermark { pub fn from_upper_bound_update(entity: &str, watermark: CommitterWatermark) -> Self { StoredWatermark { - entity: entity.to_string(), - epoch_hi_inclusive: watermark.epoch as i64, - checkpoint_hi_inclusive: watermark.cp as i64, - tx_hi_inclusive: watermark.tx as i64, + pipeline: entity.to_string(), + epoch_hi_inclusive: watermark.epoch_hi_inclusive as i64, + checkpoint_hi_inclusive: watermark.checkpoint_hi_inclusive as i64, + tx_hi: watermark.tx_hi as i64, ..StoredWatermark::default() } } pub fn from_lower_bound_update(entity: &str, epoch_lo: u64, reader_lo: u64) -> Self { StoredWatermark { - entity: entity.to_string(), + pipeline: entity.to_string(), epoch_lo: epoch_lo as i64, reader_lo: reader_lo as i64, ..StoredWatermark::default() @@ -62,7 +62,7 @@ impl StoredWatermark { } pub fn entity(&self) -> Option { - PrunableTable::from_str(&self.entity).ok() + PrunableTable::from_str(&self.pipeline).ok() } /// Determine whether to set a new epoch lower bound based on the retention policy. diff --git a/crates/sui-indexer/src/schema.rs b/crates/sui-indexer/src/schema.rs index b149b0f6faa0d..aceb54597c9c5 100644 --- a/crates/sui-indexer/src/schema.rs +++ b/crates/sui-indexer/src/schema.rs @@ -370,15 +370,15 @@ diesel::table! { } diesel::table! { - watermarks (entity) { - entity -> Text, + watermarks (pipeline) { + pipeline -> Text, epoch_hi_inclusive -> Int8, - epoch_lo -> Int8, checkpoint_hi_inclusive -> Int8, - tx_hi_inclusive -> Int8, + tx_hi -> Int8, + epoch_lo -> Int8, reader_lo -> Int8, timestamp_ms -> Int8, - pruner_hi_inclusive -> Nullable, + pruner_hi -> Int8, } } diff --git a/crates/sui-indexer/src/store/pg_indexer_store.rs b/crates/sui-indexer/src/store/pg_indexer_store.rs index aa9c98eb59d2c..2f9aaa5d81cb3 100644 --- a/crates/sui-indexer/src/store/pg_indexer_store.rs +++ b/crates/sui-indexer/src/store/pg_indexer_store.rs @@ -300,7 +300,7 @@ impl PgIndexerStore { watermarks::table .select(watermarks::checkpoint_hi_inclusive) - .filter(watermarks::entity.eq("objects_snapshot")) + .filter(watermarks::pipeline.eq("objects_snapshot")) .first::(&mut connection) .await // Handle case where the watermark is not set yet @@ -1525,13 +1525,13 @@ impl PgIndexerStore { async { diesel::insert_into(watermarks::table) .values(upper_bound_updates) - .on_conflict(watermarks::entity) + .on_conflict(watermarks::pipeline) .do_update() .set(( watermarks::epoch_hi_inclusive.eq(excluded(watermarks::epoch_hi_inclusive)), watermarks::checkpoint_hi_inclusive .eq(excluded(watermarks::checkpoint_hi_inclusive)), - watermarks::tx_hi_inclusive.eq(excluded(watermarks::tx_hi_inclusive)), + watermarks::tx_hi.eq(excluded(watermarks::tx_hi)), )) .execute(conn) .await @@ -1622,7 +1622,7 @@ impl PgIndexerStore { diesel::insert_into(watermarks::table) .values(lower_bound_updates) - .on_conflict(watermarks::entity) + .on_conflict(watermarks::pipeline) .do_update() .set(( watermarks::reader_lo.eq(excluded(watermarks::reader_lo)),