Skip to content

Commit

Permalink
[indexer] rename watermarks table fields to clarify inclusive upper b…
Browse files Browse the repository at this point in the history
…ounds (#19793)

## Description 

Describe the changes or additions included in this PR.

## Test plan 

How did you test the new or updated feature?

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
wlmyng authored Oct 10, 2024
1 parent a49ca57 commit 75264b9
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@ CREATE TABLE watermarks
entity TEXT NOT NULL,
-- Inclusive upper epoch bound for this entity's data. Committer updates this field. Pruner uses
-- this to determine if pruning is necessary based on the retention policy.
epoch_hi BIGINT NOT NULL,
epoch_hi_inclusive BIGINT NOT NULL,
-- Inclusive lower epoch bound for this entity's data. Pruner updates this field when the epoch range exceeds the retention policy.
epoch_lo BIGINT NOT NULL,
-- Inclusive upper checkpoint bound for this entity's data. Committer updates this field. All
-- data of this entity in the checkpoint must be persisted before advancing this watermark. The
-- committer refers to this on disaster recovery to resume writing.
checkpoint_hi BIGINT NOT NULL,
checkpoint_hi_inclusive BIGINT NOT NULL,
-- Inclusive upper transaction sequence number bound for this entity's data. Committer updates
-- this field.
tx_hi BIGINT NOT NULL,
tx_hi_inclusive BIGINT NOT NULL,
-- Inclusive low watermark that the pruner advances. Corresponds to the epoch id, checkpoint
-- sequence number, or tx sequence number depending on the entity. Data before this watermark is
-- considered pruned by a reader. The underlying data may still exist in the db instance.
Expand All @@ -22,9 +22,8 @@ CREATE TABLE watermarks
-- be dropped. The pruner uses this column to determine whether to prune or wait long enough
-- that all in-flight reads complete or timeout before it acts on an updated watermark.
timestamp_ms BIGINT NOT NULL,
-- Column used by the pruner to track its true progress. Data at and below this watermark has
-- been truly pruned from the db, and should no longer exist. When recovering from a crash, the
-- pruner will consult this column to determine where to continue.
pruned_lo BIGINT,
-- Column used by the pruner to track its true progress. Data at and below this watermark can
-- be immediately pruned.
pruner_lo BIGINT,
PRIMARY KEY (entity)
);
22 changes: 11 additions & 11 deletions crates/sui-indexer/src/handlers/committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,17 +219,6 @@ async fn commit_checkpoints<S>(
})
.expect("Persisting data into DB should not fail.");

state
.update_watermarks_upper_bound::<CommitterTables>(committer_watermark)
.await
.tap_err(|e| {
error!(
"Failed to update watermark upper bound with error: {}",
e.to_string()
);
})
.expect("Updating watermark upper bound in DB should not fail.");

if is_epoch_end {
// The epoch has advanced so we update the configs for the new protocol version, if it has changed.
let chain_id = state
Expand All @@ -242,6 +231,17 @@ async fn commit_checkpoints<S>(
.await;
}

state
.update_watermarks_upper_bound::<CommitterTables>(committer_watermark)
.await
.tap_err(|e| {
error!(
"Failed to update watermark upper bound with error: {}",
e.to_string()
);
})
.expect("Updating watermark upper bound in DB should not fail.");

let elapsed = guard.stop_and_record();

info!(
Expand Down
4 changes: 2 additions & 2 deletions crates/sui-indexer/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ impl From<&CheckpointData> for CommitterWatermark {
}
}

/// Enum representing tables that a committer updates.
/// Enum representing tables that the committer handler writes to.
#[derive(
Debug,
Eq,
Expand Down Expand Up @@ -253,7 +253,7 @@ pub enum CommitterTables {
PrunerCpWatermark,
}

/// Enum representing tables that the objects snapshot processor updates.
/// Enum representing tables that the objects snapshot handler writes to.
#[derive(
Debug,
Eq,
Expand Down
19 changes: 9 additions & 10 deletions crates/sui-indexer/src/models/watermarks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@ pub struct StoredWatermark {
pub entity: String,
/// Inclusive upper epoch bound for this entity's data. Committer updates this field. Pruner uses
/// this to determine if pruning is necessary based on the retention policy.
pub epoch_hi: i64,
pub epoch_hi_inclusive: i64,
/// Inclusive lower epoch bound for this entity's data. Pruner updates this field when the epoch range exceeds the retention policy.
pub epoch_lo: i64,
/// Inclusive upper checkpoint bound for this entity's data. Committer updates this field. All
/// data of this entity in the checkpoint must be persisted before advancing this watermark. The
/// committer refers to this on disaster recovery to resume writing.
pub checkpoint_hi: i64,
pub checkpoint_hi_inclusive: i64,
/// Inclusive upper transaction sequence number bound for this entity's data. Committer updates
/// this field.
pub tx_hi: i64,
pub tx_hi_inclusive: i64,
/// Inclusive low watermark that the pruner advances. Corresponds to the epoch id, checkpoint
/// sequence number, or tx sequence number depending on the entity. Data before this watermark is
/// considered pruned by a reader. The underlying data may still exist in the db instance.
Expand All @@ -33,19 +33,18 @@ pub struct StoredWatermark {
/// be dropped. The pruner uses this column to determine whether to prune or wait long enough
/// that all in-flight reads complete or timeout before it acts on an updated watermark.
pub timestamp_ms: i64,
/// Column used by the pruner to track its true progress. Data at and below this watermark has
/// been truly pruned from the db, and should no longer exist. When recovering from a crash, the
/// pruner will consult this column to determine where to continue.
pub pruned_lo: Option<i64>,
/// Column used by the pruner to track its true progress. Data at and below this watermark can
/// be immediately pruned.
pub pruner_lo: Option<i64>,
}

impl StoredWatermark {
pub fn from_upper_bound_update(entity: &str, watermark: CommitterWatermark) -> Self {
StoredWatermark {
entity: entity.to_string(),
epoch_hi: watermark.epoch as i64,
checkpoint_hi: watermark.cp as i64,
tx_hi: watermark.tx as i64,
epoch_hi_inclusive: watermark.epoch as i64,
checkpoint_hi_inclusive: watermark.cp as i64,
tx_hi_inclusive: watermark.tx as i64,
..StoredWatermark::default()
}
}
Expand Down
8 changes: 4 additions & 4 deletions crates/sui-indexer/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,13 +371,13 @@ diesel::table! {
diesel::table! {
watermarks (entity) {
entity -> Text,
epoch_hi -> Int8,
epoch_hi_inclusive -> Int8,
epoch_lo -> Int8,
checkpoint_hi -> Int8,
tx_hi -> Int8,
checkpoint_hi_inclusive -> Int8,
tx_hi_inclusive -> Int8,
reader_lo -> Int8,
timestamp_ms -> Int8,
pruned_lo -> Nullable<Int8>,
pruner_lo -> Nullable<Int8>,
}
}

Expand Down
9 changes: 5 additions & 4 deletions crates/sui-indexer/src/store/pg_indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ impl PgIndexerStore {
let mut connection = self.pool.get().await?;

watermarks::table
.select(watermarks::checkpoint_hi)
.select(watermarks::checkpoint_hi_inclusive)
.filter(watermarks::entity.eq("objects_snapshot"))
.first::<i64>(&mut connection)
.await
Expand Down Expand Up @@ -1604,9 +1604,10 @@ impl PgIndexerStore {
.on_conflict(watermarks::entity)
.do_update()
.set((
watermarks::epoch_hi.eq(excluded(watermarks::epoch_hi)),
watermarks::checkpoint_hi.eq(excluded(watermarks::checkpoint_hi)),
watermarks::tx_hi.eq(excluded(watermarks::tx_hi)),
watermarks::epoch_hi_inclusive.eq(excluded(watermarks::epoch_hi_inclusive)),
watermarks::checkpoint_hi_inclusive
.eq(excluded(watermarks::checkpoint_hi_inclusive)),
watermarks::tx_hi_inclusive.eq(excluded(watermarks::tx_hi_inclusive)),
))
.execute(conn)
.await
Expand Down

0 comments on commit 75264b9

Please sign in to comment.