Skip to content

Commit

Permalink
some renaming and pushing the conversion logic into pg_indexer_store,…
Browse files Browse the repository at this point in the history
… so pruner just tells the store what table is to be updated, and the corresponding new epoch lower bound
  • Loading branch information
wlmyng committed Oct 9, 2024
1 parent 28f06c1 commit 8d0e133
Show file tree
Hide file tree
Showing 9 changed files with 137 additions and 67 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
ALTER TABLE watermarks
RENAME COLUMN epoch_hi_inclusive TO epoch_hi;

ALTER TABLE watermarks
RENAME COLUMN checkpoint_hi_inclusive TO checkpoint_hi;

ALTER TABLE watermarks
RENAME COLUMN tx_hi_inclusive TO tx_hi;

ALTER TABLE watermarks
RENAME COLUMN pruner_lo TO pruned_lo;
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
ALTER TABLE watermarks
RENAME COLUMN epoch_hi TO epoch_hi_inclusive;

ALTER TABLE watermarks
RENAME COLUMN checkpoint_hi TO checkpoint_hi_inclusive;

ALTER TABLE watermarks
RENAME COLUMN tx_hi TO tx_hi_inclusive;

ALTER TABLE watermarks
RENAME COLUMN pruned_lo TO pruner_lo;
22 changes: 11 additions & 11 deletions crates/sui-indexer/src/handlers/committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,17 +219,6 @@ async fn commit_checkpoints<S>(
})
.expect("Persisting data into DB should not fail.");

state
.update_watermarks_upper_bound::<CommitterTables>(committer_watermark)
.await
.tap_err(|e| {
error!(
"Failed to update watermark upper bound with error: {}",
e.to_string()
);
})
.expect("Updating watermark upper bound in DB should not fail.");

if is_epoch_end {
// The epoch has advanced so we update the configs for the new protocol version, if it has changed.
let chain_id = state
Expand All @@ -242,6 +231,17 @@ async fn commit_checkpoints<S>(
.await;
}

state
.update_watermarks_upper_bound::<CommitterTables>(committer_watermark)
.await
.tap_err(|e| {
error!(
"Failed to update watermark upper bound with error: {}",
e.to_string()
);
})
.expect("Updating watermark upper bound in DB should not fail.");

let elapsed = guard.stop_and_record();

info!(
Expand Down
7 changes: 4 additions & 3 deletions crates/sui-indexer/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ pub trait Handler<T>: Send + Sync {

/// The indexer writer operates on checkpoint data, which contains information on the current epoch,
/// checkpoint, and transaction. These three numbers form the watermark upper bound for each
/// committed table.
/// committed table. The reader and pruner are responsible for determining which of the three units
/// will be used for a particular table.
#[derive(Clone, Copy, Ord, PartialOrd, Eq, PartialEq)]
pub struct CommitterWatermark {
pub epoch: u64,
Expand Down Expand Up @@ -195,7 +196,7 @@ impl From<&CheckpointData> for CommitterWatermark {
}
}

/// Enum representing tables that a committer updates.
/// Enum representing tables that the committer handler writes to.
#[derive(
Debug,
Eq,
Expand Down Expand Up @@ -253,7 +254,7 @@ pub enum CommitterTables {
PrunerCpWatermark,
}

/// Enum representing tables that the objects snapshot processor updates.
/// Enum representing tables that the objects snapshot handler writes to.
#[derive(
Debug,
Eq,
Expand Down
32 changes: 12 additions & 20 deletions crates/sui-indexer/src/handlers/pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tracing::{error, info};

use crate::config::RetentionConfig;
use crate::errors::IndexerError;
use crate::models::watermarks::{PrunableWatermark, StoredWatermark};
use crate::models::watermarks::PrunableWatermark;
use crate::store::pg_partition_manager::PgPartitionManager;
use crate::store::PgIndexerStore;
use crate::{metrics::IndexerMetrics, store::IndexerStore, types::IndexerResult};
Expand All @@ -25,8 +25,10 @@ pub struct Pruner {
pub metrics: IndexerMetrics,
}

/// Enum representing tables that the pruner is allowed to prune. The pruner will ignore any table
/// that is not listed here.
/// Enum representing tables that the pruner is allowed to prune. This corresponds to table names in
/// the database, and should be used in lieu of string literals. This enum is also meant to
/// facilitate the process of determining which unit (epoch, cp, or tx) should be used for the
/// table's range. Pruner will ignore any table that is not listed here.
#[derive(
Debug,
Eq,
Expand Down Expand Up @@ -243,8 +245,8 @@ async fn update_watermarks_lower_bounds_task(
}
}

/// Fetches all entries from the `watermarks` table, and updates the lower bounds for all watermarks
/// if the entry's epoch range exceeds the respective retention policy.
/// Fetches all entries from the `watermarks` table, and updates the `reader_lo` for each entry if
/// its epoch range exceeds the respective retention policy.
async fn update_watermarks_lower_bounds(
store: &PgIndexerStore,
retention_policies: &HashMap<PrunableTable, u64>,
Expand All @@ -267,22 +269,12 @@ async fn update_watermarks_lower_bounds(
continue;
};

if watermark.epoch_lo + epochs_to_keep <= watermark.epoch_hi {
let new_inclusive_epoch_lower_bound =
watermark.epoch_hi.saturating_sub(epochs_to_keep - 1);
if watermark.epoch_lo + epochs_to_keep <= watermark.epoch_hi_inclusive {
let new_epoch_lower_bound = watermark
.epoch_hi_inclusive
.saturating_sub(epochs_to_keep - 1);

// TODO: (wlmyng) now that epochs table is not pruned, we can add `first_tx_seq_num` or
// something and use it as a lookup table.
let (min_cp, _) = store
.get_checkpoint_range_for_epoch(new_inclusive_epoch_lower_bound)
.await?;
let (min_tx, _) = store.get_transaction_range_for_checkpoint(min_cp).await?;

lower_bound_updates.push(StoredWatermark::from_lower_bound_update(
watermark.entity.as_ref(),
new_inclusive_epoch_lower_bound,
watermark.entity.select_lower_bound(min_cp, min_tx),
))
lower_bound_updates.push((watermark.entity, new_epoch_lower_bound));
}
}

Expand Down
36 changes: 17 additions & 19 deletions crates/sui-indexer/src/models/watermarks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@ pub struct StoredWatermark {
pub entity: String,
/// Inclusive upper epoch bound for this entity's data. Committer updates this field. Pruner uses
/// this to determine if pruning is necessary based on the retention policy.
pub epoch_hi: i64,
pub epoch_hi_inclusive: i64,
/// Inclusive lower epoch bound for this entity's data. Pruner updates this field when the epoch range exceeds the retention policy.
pub epoch_lo: i64,
/// Inclusive upper checkpoint bound for this entity's data. Committer updates this field. All
/// data of this entity in the checkpoint must be persisted before advancing this watermark. The
/// committer refers to this on disaster recovery to resume writing.
pub checkpoint_hi: i64,
pub checkpoint_hi_inclusive: i64,
/// Inclusive upper transaction sequence number bound for this entity's data. Committer updates
/// this field.
pub tx_hi: i64,
pub tx_hi_inclusive: i64,
/// Inclusive low watermark that the pruner advances. Corresponds to the epoch id, checkpoint
/// sequence number, or tx sequence number depending on the entity. Data before this watermark is
/// considered pruned by a reader. The underlying data may still exist in the db instance.
Expand All @@ -39,58 +39,56 @@ pub struct StoredWatermark {
/// Column used by the pruner to track its true progress. Data at and below this watermark has
/// been truly pruned from the db, and should no longer exist. When recovering from a crash, the
/// pruner will consult this column to determine where to continue.
pub pruned_lo: Option<i64>,
pub pruner_lo: Option<i64>,
}

#[derive(Debug)]
pub struct PrunableWatermark {
pub entity: PrunableTable,
pub epoch_hi: u64,
pub epoch_hi_inclusive: u64,
pub epoch_lo: u64,
pub checkpoint_hi: u64,
pub tx_hi: u64,
pub checkpoint_hi_inclusive: u64,
pub tx_hi_inclusive: u64,
pub reader_lo: u64,
/// Timestamp when the watermark's lower bound was last updated.
pub timestamp_ms: i64,
/// Latest timestamp read from db.
pub current_timestamp_ms: i64,
/// Data at and below `pruned_lo` is considered pruned by the pruner.
pub pruned_lo: Option<u64>,
pub pruner_lo: Option<u64>,
}

impl PrunableWatermark {
pub fn new(stored: StoredWatermark, latest_db_timestamp: i64) -> Option<Self> {
let Some(entity) = PrunableTable::from_str(&stored.entity).ok() else {
return None;
};
let entity = PrunableTable::from_str(&stored.entity).ok()?;

Some(PrunableWatermark {
entity,
epoch_hi: stored.epoch_hi as u64,
epoch_hi_inclusive: stored.epoch_hi_inclusive as u64,
epoch_lo: stored.epoch_lo as u64,
checkpoint_hi: stored.checkpoint_hi as u64,
tx_hi: stored.tx_hi as u64,
checkpoint_hi_inclusive: stored.checkpoint_hi_inclusive as u64,
tx_hi_inclusive: stored.tx_hi_inclusive as u64,
reader_lo: stored.reader_lo as u64,
timestamp_ms: stored.timestamp_ms,
current_timestamp_ms: latest_db_timestamp,
pruned_lo: stored.pruned_lo.map(|lo| lo as u64),
pruner_lo: stored.pruner_lo.map(|lo| lo as u64),
})
}

/// Represents the first `unit` (checkpoint, tx, epoch) that has not yet been pruned. If
/// `pruned_lo` is not set in db, default to 0. Otherwise, this is `pruned_lo + `.
pub fn pruner_lo(&self) -> u64 {
self.pruned_lo.map_or(0, |lo| lo.saturating_add(1))
self.pruner_lo.map_or(0, |lo| lo.saturating_add(1))
}
}

impl StoredWatermark {
pub fn from_upper_bound_update(entity: &str, watermark: CommitterWatermark) -> Self {
StoredWatermark {
entity: entity.to_string(),
epoch_hi: watermark.epoch as i64,
checkpoint_hi: watermark.cp as i64,
tx_hi: watermark.tx as i64,
epoch_hi_inclusive: watermark.epoch as i64,
checkpoint_hi_inclusive: watermark.cp as i64,
tx_hi_inclusive: watermark.tx as i64,
..StoredWatermark::default()
}
}
Expand Down
8 changes: 4 additions & 4 deletions crates/sui-indexer/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,13 +371,13 @@ diesel::table! {
diesel::table! {
watermarks (entity) {
entity -> Text,
epoch_hi -> Int8,
epoch_hi_inclusive -> Int8,
epoch_lo -> Int8,
checkpoint_hi -> Int8,
tx_hi -> Int8,
checkpoint_hi_inclusive -> Int8,
tx_hi_inclusive -> Int8,
reader_lo -> Int8,
timestamp_ms -> Int8,
pruned_lo -> Nullable<Int8>,
pruner_lo -> Nullable<Int8>,
}
}

Expand Down
5 changes: 4 additions & 1 deletion crates/sui-indexer/src/store/indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use async_trait::async_trait;
use strum::IntoEnumIterator;

use crate::errors::IndexerError;
use crate::handlers::pruner::PrunableTable;
use crate::handlers::{CommitterWatermark, EpochToCommit, TransactionObjectChangesToCommit};
use crate::models::display::StoredDisplay;
use crate::models::obj_indices::StoredObjectVersion;
Expand Down Expand Up @@ -125,9 +126,11 @@ pub trait IndexerStore: Clone + Sync + Send + 'static {
where
E::Iterator: Iterator<Item: AsRef<str>>;

/// Updates each watermark entry's lower bounds per the list of tables and their new epoch lower
/// bounds.
async fn update_watermarks_lower_bound(
&self,
watermarks: Vec<StoredWatermark>,
watermarks: Vec<(PrunableTable, u64)>,
) -> Result<(), IndexerError>;

/// Load all watermark entries from the store, and the latest timestamp from the db.
Expand Down
Loading

0 comments on commit 8d0e133

Please sign in to comment.