From 80b00a51b146b345af190fe76229458d68053a01 Mon Sep 17 00:00:00 2001 From: wlmyng <127570466+wlmyng@users.noreply.github.com> Date: Fri, 18 Oct 2024 16:12:49 -0700 Subject: [PATCH] [indexer] align watermarks table schema in live indexer to alt indexer (#19908) ## Description Since watermarks table isn't being written to yet, modify the db schema to match alt-indexer. The changes are to rename entity -> pipeline, tx_hi_inclusive -> tx_hi, and pruner_hi_inclusive -> pruner_hi and make it a non-null column. This works out nicely for graphql, since the transactions query implementations expect a half-open interval. Also simplifies pruner logic, since it can write the `reader_lo` as `pruner_hi` after delay, and table pruners will delete between `[table_data, pruner_hi)`. ## Test plan How did you test the new or updated feature? --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] Indexer: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: - [ ] REST API: --- .../pg/2024-09-12-213234_watermarks/up.sql | 51 ++++++++++--------- crates/sui-indexer/src/handlers/committer.rs | 11 ++-- crates/sui-indexer/src/handlers/mod.rs | 23 ++++----- .../src/handlers/objects_snapshot_handler.rs | 2 +- crates/sui-indexer/src/models/watermarks.rs | 28 +++++----- crates/sui-indexer/src/schema.rs | 10 ++-- .../sui-indexer/src/store/pg_indexer_store.rs | 8 +-- 7 files changed, 68 insertions(+), 65 deletions(-) diff --git a/crates/sui-indexer/migrations/pg/2024-09-12-213234_watermarks/up.sql b/crates/sui-indexer/migrations/pg/2024-09-12-213234_watermarks/up.sql index 2187cadc26f32..73bdc70055246 100644 --- a/crates/sui-indexer/migrations/pg/2024-09-12-213234_watermarks/up.sql +++ b/crates/sui-indexer/migrations/pg/2024-09-12-213234_watermarks/up.sql @@ -1,29 +1,34 @@ -CREATE TABLE watermarks +CREATE TABLE IF NOT EXISTS watermarks ( - -- The table governed by this watermark, i.e `epochs`, `checkpoints`, `transactions`. - entity TEXT NOT NULL, - -- Inclusive upper epoch bound for this entity's data. Committer updates this field. Pruner uses - -- this to determine if pruning is necessary based on the retention policy. + -- The pipeline governed by this watermark, i.e `epochs`, `checkpoints`, + -- `transactions`. + pipeline TEXT PRIMARY KEY, + -- Inclusive upper epoch bound for this entity's data. Committer updates + -- this field. Pruner uses this to determine if pruning is necessary based + -- on the retention policy. epoch_hi_inclusive BIGINT NOT NULL, - -- Inclusive lower epoch bound for this entity's data. Pruner updates this field when the epoch range exceeds the retention policy. - epoch_lo BIGINT NOT NULL, - -- Inclusive upper checkpoint bound for this entity's data. Committer updates this field. All - -- data of this entity in the checkpoint must be persisted before advancing this watermark. The - -- committer refers to this on disaster recovery to resume writing. + -- Inclusive upper checkpoint bound for this entity's data. Committer + -- updates this field. All data of this entity in the checkpoint must be + -- persisted before advancing this watermark. The committer refers to this + -- on disaster recovery to resume writing. checkpoint_hi_inclusive BIGINT NOT NULL, - -- Inclusive upper transaction sequence number bound for this entity's data. Committer updates - -- this field. - tx_hi_inclusive BIGINT NOT NULL, - -- Inclusive low watermark that the pruner advances. Corresponds to the epoch id, checkpoint - -- sequence number, or tx sequence number depending on the entity. Data before this watermark is - -- considered pruned by a reader. The underlying data may still exist in the db instance. + -- Exclusive upper transaction sequence number bound for this entity's + -- data. Committer updates this field. + tx_hi BIGINT NOT NULL, + -- Inclusive lower epoch bound for this entity's data. Pruner updates this + -- field when the epoch range exceeds the retention policy. + epoch_lo BIGINT NOT NULL, + -- Inclusive low watermark that the pruner advances. Corresponds to the + -- epoch id, checkpoint sequence number, or tx sequence number depending on + -- the entity. Data before this watermark is considered pruned by a reader. + -- The underlying data may still exist in the db instance. reader_lo BIGINT NOT NULL, - -- Updated using the database's current timestamp when the pruner sees that some data needs to - -- be dropped. The pruner uses this column to determine whether to prune or wait long enough - -- that all in-flight reads complete or timeout before it acts on an updated watermark. + -- Updated using the database's current timestamp when the pruner sees that + -- some data needs to be dropped. The pruner uses this column to determine + -- whether to prune or wait long enough that all in-flight reads complete + -- or timeout before it acts on an updated watermark. timestamp_ms BIGINT NOT NULL, - -- Column used by the pruner to track its true progress. Data at and below this watermark can - -- be immediately pruned. - pruner_hi_inclusive BIGINT, - PRIMARY KEY (entity) + -- Column used by the pruner to track its true progress. Data below this + -- watermark can be immediately pruned. + pruner_hi BIGINT NOT NULL ); diff --git a/crates/sui-indexer/src/handlers/committer.rs b/crates/sui-indexer/src/handlers/committer.rs index 6c08fda947667..4d5ad05d731c5 100644 --- a/crates/sui-indexer/src/handlers/committer.rs +++ b/crates/sui-indexer/src/handlers/committer.rs @@ -264,19 +264,20 @@ async fn commit_checkpoints( elapsed, "Checkpoint {}-{} committed with {} transactions.", first_checkpoint_seq, - committer_watermark.cp, + committer_watermark.checkpoint_hi_inclusive, tx_count, ); metrics .latest_tx_checkpoint_sequence_number - .set(committer_watermark.cp as i64); + .set(committer_watermark.checkpoint_hi_inclusive as i64); metrics .total_tx_checkpoint_committed .inc_by(checkpoint_num as u64); metrics.total_transaction_committed.inc_by(tx_count as u64); - metrics - .transaction_per_checkpoint - .observe(tx_count as f64 / (committer_watermark.cp - first_checkpoint_seq + 1) as f64); + metrics.transaction_per_checkpoint.observe( + tx_count as f64 + / (committer_watermark.checkpoint_hi_inclusive - first_checkpoint_seq + 1) as f64, + ); // 1000.0 is not necessarily the batch size, it's to roughly map average tx commit latency to [0.1, 1] seconds, // which is well covered by DB_COMMIT_LATENCY_SEC_BUCKETS. metrics diff --git a/crates/sui-indexer/src/handlers/mod.rs b/crates/sui-indexer/src/handlers/mod.rs index c36b3b4d13f4a..a6c6412f3a42c 100644 --- a/crates/sui-indexer/src/handlers/mod.rs +++ b/crates/sui-indexer/src/handlers/mod.rs @@ -130,7 +130,7 @@ impl CommonHandler { return Ok(()); } for tuple in tuple_chunk { - unprocessed.insert(tuple.0.cp, tuple); + unprocessed.insert(tuple.0.checkpoint_hi_inclusive, tuple); } } Some(None) => break, // Stream has ended @@ -200,17 +200,17 @@ pub trait Handler: Send + Sync { /// will be used for a particular table. #[derive(Clone, Copy, Ord, PartialOrd, Eq, PartialEq)] pub struct CommitterWatermark { - pub epoch: u64, - pub cp: u64, - pub tx: u64, + pub epoch_hi_inclusive: u64, + pub checkpoint_hi_inclusive: u64, + pub tx_hi: u64, } impl From<&IndexedCheckpoint> for CommitterWatermark { fn from(checkpoint: &IndexedCheckpoint) -> Self { Self { - epoch: checkpoint.epoch, - cp: checkpoint.sequence_number, - tx: checkpoint.network_total_transactions.saturating_sub(1), + epoch_hi_inclusive: checkpoint.epoch, + checkpoint_hi_inclusive: checkpoint.sequence_number, + tx_hi: checkpoint.network_total_transactions, } } } @@ -218,12 +218,9 @@ impl From<&IndexedCheckpoint> for CommitterWatermark { impl From<&CheckpointData> for CommitterWatermark { fn from(checkpoint: &CheckpointData) -> Self { Self { - epoch: checkpoint.checkpoint_summary.epoch, - cp: checkpoint.checkpoint_summary.sequence_number, - tx: checkpoint - .checkpoint_summary - .network_total_transactions - .saturating_sub(1), + epoch_hi_inclusive: checkpoint.checkpoint_summary.epoch, + checkpoint_hi_inclusive: checkpoint.checkpoint_summary.sequence_number, + tx_hi: checkpoint.checkpoint_summary.network_total_transactions, } } } diff --git a/crates/sui-indexer/src/handlers/objects_snapshot_handler.rs b/crates/sui-indexer/src/handlers/objects_snapshot_handler.rs index 960f057b54ddd..816b416fc3743 100644 --- a/crates/sui-indexer/src/handlers/objects_snapshot_handler.rs +++ b/crates/sui-indexer/src/handlers/objects_snapshot_handler.rs @@ -73,7 +73,7 @@ impl Handler for ObjectsSnapshotHandler { self.metrics .latest_object_snapshot_sequence_number - .set(watermark.cp as i64); + .set(watermark.checkpoint_hi_inclusive as i64); Ok(()) } diff --git a/crates/sui-indexer/src/models/watermarks.rs b/crates/sui-indexer/src/models/watermarks.rs index 79bd014c9a500..1ff3d3cfe52ac 100644 --- a/crates/sui-indexer/src/models/watermarks.rs +++ b/crates/sui-indexer/src/models/watermarks.rs @@ -15,19 +15,19 @@ use crate::{ #[diesel(table_name = watermarks, primary_key(entity))] pub struct StoredWatermark { /// The table governed by this watermark, i.e `epochs`, `checkpoints`, `transactions`. - pub entity: String, + pub pipeline: String, /// Inclusive upper epoch bound for this entity's data. Committer updates this field. Pruner uses /// this to determine if pruning is necessary based on the retention policy. pub epoch_hi_inclusive: i64, - /// Inclusive lower epoch bound for this entity's data. Pruner updates this field when the epoch range exceeds the retention policy. - pub epoch_lo: i64, /// Inclusive upper checkpoint bound for this entity's data. Committer updates this field. All /// data of this entity in the checkpoint must be persisted before advancing this watermark. The /// committer refers to this on disaster recovery to resume writing. pub checkpoint_hi_inclusive: i64, - /// Inclusive upper transaction sequence number bound for this entity's data. Committer updates + /// Exclusive upper transaction sequence number bound for this entity's data. Committer updates /// this field. - pub tx_hi_inclusive: i64, + pub tx_hi: i64, + /// Inclusive lower epoch bound for this entity's data. Pruner updates this field when the epoch range exceeds the retention policy. + pub epoch_lo: i64, /// Inclusive low watermark that the pruner advances. Corresponds to the epoch id, checkpoint /// sequence number, or tx sequence number depending on the entity. Data before this watermark is /// considered pruned by a reader. The underlying data may still exist in the db instance. @@ -36,25 +36,25 @@ pub struct StoredWatermark { /// be dropped. The pruner uses this column to determine whether to prune or wait long enough /// that all in-flight reads complete or timeout before it acts on an updated watermark. pub timestamp_ms: i64, - /// Column used by the pruner to track its true progress. Data at and below this watermark can - /// be immediately pruned. - pub pruner_hi_inclusive: Option, + /// Column used by the pruner to track its true progress. Data below this watermark can be + /// immediately pruned. + pub pruner_hi: i64, } impl StoredWatermark { pub fn from_upper_bound_update(entity: &str, watermark: CommitterWatermark) -> Self { StoredWatermark { - entity: entity.to_string(), - epoch_hi_inclusive: watermark.epoch as i64, - checkpoint_hi_inclusive: watermark.cp as i64, - tx_hi_inclusive: watermark.tx as i64, + pipeline: entity.to_string(), + epoch_hi_inclusive: watermark.epoch_hi_inclusive as i64, + checkpoint_hi_inclusive: watermark.checkpoint_hi_inclusive as i64, + tx_hi: watermark.tx_hi as i64, ..StoredWatermark::default() } } pub fn from_lower_bound_update(entity: &str, epoch_lo: u64, reader_lo: u64) -> Self { StoredWatermark { - entity: entity.to_string(), + pipeline: entity.to_string(), epoch_lo: epoch_lo as i64, reader_lo: reader_lo as i64, ..StoredWatermark::default() @@ -62,7 +62,7 @@ impl StoredWatermark { } pub fn entity(&self) -> Option { - PrunableTable::from_str(&self.entity).ok() + PrunableTable::from_str(&self.pipeline).ok() } /// Determine whether to set a new epoch lower bound based on the retention policy. diff --git a/crates/sui-indexer/src/schema.rs b/crates/sui-indexer/src/schema.rs index b149b0f6faa0d..aceb54597c9c5 100644 --- a/crates/sui-indexer/src/schema.rs +++ b/crates/sui-indexer/src/schema.rs @@ -370,15 +370,15 @@ diesel::table! { } diesel::table! { - watermarks (entity) { - entity -> Text, + watermarks (pipeline) { + pipeline -> Text, epoch_hi_inclusive -> Int8, - epoch_lo -> Int8, checkpoint_hi_inclusive -> Int8, - tx_hi_inclusive -> Int8, + tx_hi -> Int8, + epoch_lo -> Int8, reader_lo -> Int8, timestamp_ms -> Int8, - pruner_hi_inclusive -> Nullable, + pruner_hi -> Int8, } } diff --git a/crates/sui-indexer/src/store/pg_indexer_store.rs b/crates/sui-indexer/src/store/pg_indexer_store.rs index aa9c98eb59d2c..2f9aaa5d81cb3 100644 --- a/crates/sui-indexer/src/store/pg_indexer_store.rs +++ b/crates/sui-indexer/src/store/pg_indexer_store.rs @@ -300,7 +300,7 @@ impl PgIndexerStore { watermarks::table .select(watermarks::checkpoint_hi_inclusive) - .filter(watermarks::entity.eq("objects_snapshot")) + .filter(watermarks::pipeline.eq("objects_snapshot")) .first::(&mut connection) .await // Handle case where the watermark is not set yet @@ -1525,13 +1525,13 @@ impl PgIndexerStore { async { diesel::insert_into(watermarks::table) .values(upper_bound_updates) - .on_conflict(watermarks::entity) + .on_conflict(watermarks::pipeline) .do_update() .set(( watermarks::epoch_hi_inclusive.eq(excluded(watermarks::epoch_hi_inclusive)), watermarks::checkpoint_hi_inclusive .eq(excluded(watermarks::checkpoint_hi_inclusive)), - watermarks::tx_hi_inclusive.eq(excluded(watermarks::tx_hi_inclusive)), + watermarks::tx_hi.eq(excluded(watermarks::tx_hi)), )) .execute(conn) .await @@ -1622,7 +1622,7 @@ impl PgIndexerStore { diesel::insert_into(watermarks::table) .values(lower_bound_updates) - .on_conflict(watermarks::entity) + .on_conflict(watermarks::pipeline) .do_update() .set(( watermarks::reader_lo.eq(excluded(watermarks::reader_lo)),