Skip to content

Commit

Permalink
[indexer] align watermarks table schema in live indexer to alt indexer (
Browse files Browse the repository at this point in the history
#19908)

## Description 

Since watermarks table isn't being written to yet, modify the db schema
to match alt-indexer. The changes are to rename entity -> pipeline,
tx_hi_inclusive -> tx_hi, and pruner_hi_inclusive -> pruner_hi and make
it a non-null column. This works out nicely for graphql, since the
transactions query implementations expect a half-open interval. Also
simplifies pruner logic, since it can write the `reader_lo` as
`pruner_hi` after delay, and table pruners will delete between
`[table_data, pruner_hi)`.

## Test plan 

How did you test the new or updated feature?

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
wlmyng authored Oct 18, 2024
1 parent 8519d53 commit 80b00a5
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 65 deletions.
Original file line number Diff line number Diff line change
@@ -1,29 +1,34 @@
CREATE TABLE watermarks
CREATE TABLE IF NOT EXISTS watermarks
(
-- The table governed by this watermark, i.e `epochs`, `checkpoints`, `transactions`.
entity TEXT NOT NULL,
-- Inclusive upper epoch bound for this entity's data. Committer updates this field. Pruner uses
-- this to determine if pruning is necessary based on the retention policy.
-- The pipeline governed by this watermark, i.e `epochs`, `checkpoints`,
-- `transactions`.
pipeline TEXT PRIMARY KEY,
-- Inclusive upper epoch bound for this entity's data. Committer updates
-- this field. Pruner uses this to determine if pruning is necessary based
-- on the retention policy.
epoch_hi_inclusive BIGINT NOT NULL,
-- Inclusive lower epoch bound for this entity's data. Pruner updates this field when the epoch range exceeds the retention policy.
epoch_lo BIGINT NOT NULL,
-- Inclusive upper checkpoint bound for this entity's data. Committer updates this field. All
-- data of this entity in the checkpoint must be persisted before advancing this watermark. The
-- committer refers to this on disaster recovery to resume writing.
-- Inclusive upper checkpoint bound for this entity's data. Committer
-- updates this field. All data of this entity in the checkpoint must be
-- persisted before advancing this watermark. The committer refers to this
-- on disaster recovery to resume writing.
checkpoint_hi_inclusive BIGINT NOT NULL,
-- Inclusive upper transaction sequence number bound for this entity's data. Committer updates
-- this field.
tx_hi_inclusive BIGINT NOT NULL,
-- Inclusive low watermark that the pruner advances. Corresponds to the epoch id, checkpoint
-- sequence number, or tx sequence number depending on the entity. Data before this watermark is
-- considered pruned by a reader. The underlying data may still exist in the db instance.
-- Exclusive upper transaction sequence number bound for this entity's
-- data. Committer updates this field.
tx_hi BIGINT NOT NULL,
-- Inclusive lower epoch bound for this entity's data. Pruner updates this
-- field when the epoch range exceeds the retention policy.
epoch_lo BIGINT NOT NULL,
-- Inclusive low watermark that the pruner advances. Corresponds to the
-- epoch id, checkpoint sequence number, or tx sequence number depending on
-- the entity. Data before this watermark is considered pruned by a reader.
-- The underlying data may still exist in the db instance.
reader_lo BIGINT NOT NULL,
-- Updated using the database's current timestamp when the pruner sees that some data needs to
-- be dropped. The pruner uses this column to determine whether to prune or wait long enough
-- that all in-flight reads complete or timeout before it acts on an updated watermark.
-- Updated using the database's current timestamp when the pruner sees that
-- some data needs to be dropped. The pruner uses this column to determine
-- whether to prune or wait long enough that all in-flight reads complete
-- or timeout before it acts on an updated watermark.
timestamp_ms BIGINT NOT NULL,
-- Column used by the pruner to track its true progress. Data at and below this watermark can
-- be immediately pruned.
pruner_hi_inclusive BIGINT,
PRIMARY KEY (entity)
-- Column used by the pruner to track its true progress. Data below this
-- watermark can be immediately pruned.
pruner_hi BIGINT NOT NULL
);
11 changes: 6 additions & 5 deletions crates/sui-indexer/src/handlers/committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,19 +264,20 @@ async fn commit_checkpoints<S>(
elapsed,
"Checkpoint {}-{} committed with {} transactions.",
first_checkpoint_seq,
committer_watermark.cp,
committer_watermark.checkpoint_hi_inclusive,
tx_count,
);
metrics
.latest_tx_checkpoint_sequence_number
.set(committer_watermark.cp as i64);
.set(committer_watermark.checkpoint_hi_inclusive as i64);
metrics
.total_tx_checkpoint_committed
.inc_by(checkpoint_num as u64);
metrics.total_transaction_committed.inc_by(tx_count as u64);
metrics
.transaction_per_checkpoint
.observe(tx_count as f64 / (committer_watermark.cp - first_checkpoint_seq + 1) as f64);
metrics.transaction_per_checkpoint.observe(
tx_count as f64
/ (committer_watermark.checkpoint_hi_inclusive - first_checkpoint_seq + 1) as f64,
);
// 1000.0 is not necessarily the batch size, it's to roughly map average tx commit latency to [0.1, 1] seconds,
// which is well covered by DB_COMMIT_LATENCY_SEC_BUCKETS.
metrics
Expand Down
23 changes: 10 additions & 13 deletions crates/sui-indexer/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl<T> CommonHandler<T> {
return Ok(());
}
for tuple in tuple_chunk {
unprocessed.insert(tuple.0.cp, tuple);
unprocessed.insert(tuple.0.checkpoint_hi_inclusive, tuple);
}
}
Some(None) => break, // Stream has ended
Expand Down Expand Up @@ -200,30 +200,27 @@ pub trait Handler<T>: Send + Sync {
/// will be used for a particular table.
#[derive(Clone, Copy, Ord, PartialOrd, Eq, PartialEq)]
pub struct CommitterWatermark {
pub epoch: u64,
pub cp: u64,
pub tx: u64,
pub epoch_hi_inclusive: u64,
pub checkpoint_hi_inclusive: u64,
pub tx_hi: u64,
}

impl From<&IndexedCheckpoint> for CommitterWatermark {
fn from(checkpoint: &IndexedCheckpoint) -> Self {
Self {
epoch: checkpoint.epoch,
cp: checkpoint.sequence_number,
tx: checkpoint.network_total_transactions.saturating_sub(1),
epoch_hi_inclusive: checkpoint.epoch,
checkpoint_hi_inclusive: checkpoint.sequence_number,
tx_hi: checkpoint.network_total_transactions,
}
}
}

impl From<&CheckpointData> for CommitterWatermark {
fn from(checkpoint: &CheckpointData) -> Self {
Self {
epoch: checkpoint.checkpoint_summary.epoch,
cp: checkpoint.checkpoint_summary.sequence_number,
tx: checkpoint
.checkpoint_summary
.network_total_transactions
.saturating_sub(1),
epoch_hi_inclusive: checkpoint.checkpoint_summary.epoch,
checkpoint_hi_inclusive: checkpoint.checkpoint_summary.sequence_number,
tx_hi: checkpoint.checkpoint_summary.network_total_transactions,
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl Handler<TransactionObjectChangesToCommit> for ObjectsSnapshotHandler {

self.metrics
.latest_object_snapshot_sequence_number
.set(watermark.cp as i64);
.set(watermark.checkpoint_hi_inclusive as i64);
Ok(())
}

Expand Down
28 changes: 14 additions & 14 deletions crates/sui-indexer/src/models/watermarks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,19 @@ use crate::{
#[diesel(table_name = watermarks, primary_key(entity))]
pub struct StoredWatermark {
/// The table governed by this watermark, i.e `epochs`, `checkpoints`, `transactions`.
pub entity: String,
pub pipeline: String,
/// Inclusive upper epoch bound for this entity's data. Committer updates this field. Pruner uses
/// this to determine if pruning is necessary based on the retention policy.
pub epoch_hi_inclusive: i64,
/// Inclusive lower epoch bound for this entity's data. Pruner updates this field when the epoch range exceeds the retention policy.
pub epoch_lo: i64,
/// Inclusive upper checkpoint bound for this entity's data. Committer updates this field. All
/// data of this entity in the checkpoint must be persisted before advancing this watermark. The
/// committer refers to this on disaster recovery to resume writing.
pub checkpoint_hi_inclusive: i64,
/// Inclusive upper transaction sequence number bound for this entity's data. Committer updates
/// Exclusive upper transaction sequence number bound for this entity's data. Committer updates
/// this field.
pub tx_hi_inclusive: i64,
pub tx_hi: i64,
/// Inclusive lower epoch bound for this entity's data. Pruner updates this field when the epoch range exceeds the retention policy.
pub epoch_lo: i64,
/// Inclusive low watermark that the pruner advances. Corresponds to the epoch id, checkpoint
/// sequence number, or tx sequence number depending on the entity. Data before this watermark is
/// considered pruned by a reader. The underlying data may still exist in the db instance.
Expand All @@ -36,33 +36,33 @@ pub struct StoredWatermark {
/// be dropped. The pruner uses this column to determine whether to prune or wait long enough
/// that all in-flight reads complete or timeout before it acts on an updated watermark.
pub timestamp_ms: i64,
/// Column used by the pruner to track its true progress. Data at and below this watermark can
/// be immediately pruned.
pub pruner_hi_inclusive: Option<i64>,
/// Column used by the pruner to track its true progress. Data below this watermark can be
/// immediately pruned.
pub pruner_hi: i64,
}

impl StoredWatermark {
pub fn from_upper_bound_update(entity: &str, watermark: CommitterWatermark) -> Self {
StoredWatermark {
entity: entity.to_string(),
epoch_hi_inclusive: watermark.epoch as i64,
checkpoint_hi_inclusive: watermark.cp as i64,
tx_hi_inclusive: watermark.tx as i64,
pipeline: entity.to_string(),
epoch_hi_inclusive: watermark.epoch_hi_inclusive as i64,
checkpoint_hi_inclusive: watermark.checkpoint_hi_inclusive as i64,
tx_hi: watermark.tx_hi as i64,
..StoredWatermark::default()
}
}

pub fn from_lower_bound_update(entity: &str, epoch_lo: u64, reader_lo: u64) -> Self {
StoredWatermark {
entity: entity.to_string(),
pipeline: entity.to_string(),
epoch_lo: epoch_lo as i64,
reader_lo: reader_lo as i64,
..StoredWatermark::default()
}
}

pub fn entity(&self) -> Option<PrunableTable> {
PrunableTable::from_str(&self.entity).ok()
PrunableTable::from_str(&self.pipeline).ok()
}

/// Determine whether to set a new epoch lower bound based on the retention policy.
Expand Down
10 changes: 5 additions & 5 deletions crates/sui-indexer/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,15 +370,15 @@ diesel::table! {
}

diesel::table! {
watermarks (entity) {
entity -> Text,
watermarks (pipeline) {
pipeline -> Text,
epoch_hi_inclusive -> Int8,
epoch_lo -> Int8,
checkpoint_hi_inclusive -> Int8,
tx_hi_inclusive -> Int8,
tx_hi -> Int8,
epoch_lo -> Int8,
reader_lo -> Int8,
timestamp_ms -> Int8,
pruner_hi_inclusive -> Nullable<Int8>,
pruner_hi -> Int8,
}
}

Expand Down
8 changes: 4 additions & 4 deletions crates/sui-indexer/src/store/pg_indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ impl PgIndexerStore {

watermarks::table
.select(watermarks::checkpoint_hi_inclusive)
.filter(watermarks::entity.eq("objects_snapshot"))
.filter(watermarks::pipeline.eq("objects_snapshot"))
.first::<i64>(&mut connection)
.await
// Handle case where the watermark is not set yet
Expand Down Expand Up @@ -1525,13 +1525,13 @@ impl PgIndexerStore {
async {
diesel::insert_into(watermarks::table)
.values(upper_bound_updates)
.on_conflict(watermarks::entity)
.on_conflict(watermarks::pipeline)
.do_update()
.set((
watermarks::epoch_hi_inclusive.eq(excluded(watermarks::epoch_hi_inclusive)),
watermarks::checkpoint_hi_inclusive
.eq(excluded(watermarks::checkpoint_hi_inclusive)),
watermarks::tx_hi_inclusive.eq(excluded(watermarks::tx_hi_inclusive)),
watermarks::tx_hi.eq(excluded(watermarks::tx_hi)),
))
.execute(conn)
.await
Expand Down Expand Up @@ -1622,7 +1622,7 @@ impl PgIndexerStore {

diesel::insert_into(watermarks::table)
.values(lower_bound_updates)
.on_conflict(watermarks::entity)
.on_conflict(watermarks::pipeline)
.do_update()
.set((
watermarks::reader_lo.eq(excluded(watermarks::reader_lo)),
Expand Down

0 comments on commit 80b00a5

Please sign in to comment.