Skip to content

Commit

Permalink
some renaming and pushing the conversion logic into pg_indexer_store,…
Browse files Browse the repository at this point in the history
… so pruner just tells the store what table is to be updated, and the corresponding new epoch lower bound
  • Loading branch information
wlmyng committed Oct 9, 2024
1 parent 28f06c1 commit cedb9e7
Show file tree
Hide file tree
Showing 9 changed files with 134 additions and 62 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
ALTER TABLE watermarks
RENAME COLUMN epoch_hi_inclusive TO epoch_hi;

ALTER TABLE watermarks
RENAME COLUMN checkpoint_hi_inclusive TO checkpoint_hi;

ALTER TABLE watermarks
RENAME COLUMN tx_hi_inclusive TO tx_hi;

ALTER TABLE watermarks
RENAME COLUMN pruner_lo TO pruned_lo;
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
ALTER TABLE watermarks
RENAME COLUMN epoch_hi TO epoch_hi_inclusive;

ALTER TABLE watermarks
RENAME COLUMN checkpoint_hi TO checkpoint_hi_inclusive;

ALTER TABLE watermarks
RENAME COLUMN tx_hi TO tx_hi_inclusive;

ALTER TABLE watermarks
RENAME COLUMN pruned_lo TO pruner_lo;
22 changes: 11 additions & 11 deletions crates/sui-indexer/src/handlers/committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,17 +219,6 @@ async fn commit_checkpoints<S>(
})
.expect("Persisting data into DB should not fail.");

state
.update_watermarks_upper_bound::<CommitterTables>(committer_watermark)
.await
.tap_err(|e| {
error!(
"Failed to update watermark upper bound with error: {}",
e.to_string()
);
})
.expect("Updating watermark upper bound in DB should not fail.");

if is_epoch_end {
// The epoch has advanced so we update the configs for the new protocol version, if it has changed.
let chain_id = state
Expand All @@ -242,6 +231,17 @@ async fn commit_checkpoints<S>(
.await;
}

state
.update_watermarks_upper_bound::<CommitterTables>(committer_watermark)
.await
.tap_err(|e| {
error!(
"Failed to update watermark upper bound with error: {}",
e.to_string()
);
})
.expect("Updating watermark upper bound in DB should not fail.");

let elapsed = guard.stop_and_record();

info!(
Expand Down
7 changes: 4 additions & 3 deletions crates/sui-indexer/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ pub trait Handler<T>: Send + Sync {

/// The indexer writer operates on checkpoint data, which contains information on the current epoch,
/// checkpoint, and transaction. These three numbers form the watermark upper bound for each
/// committed table.
/// committed table. The reader and pruner are responsible for determining which of the three units
/// will be used for a particular table.
#[derive(Clone, Copy, Ord, PartialOrd, Eq, PartialEq)]
pub struct CommitterWatermark {
pub epoch: u64,
Expand Down Expand Up @@ -195,7 +196,7 @@ impl From<&CheckpointData> for CommitterWatermark {
}
}

/// Enum representing tables that a committer updates.
/// Enum representing tables that the committer handler writes to.
#[derive(
Debug,
Eq,
Expand Down Expand Up @@ -253,7 +254,7 @@ pub enum CommitterTables {
PrunerCpWatermark,
}

/// Enum representing tables that the objects snapshot processor updates.
/// Enum representing tables that the objects snapshot handler writes to.
#[derive(
Debug,
Eq,
Expand Down
32 changes: 12 additions & 20 deletions crates/sui-indexer/src/handlers/pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tracing::{error, info};

use crate::config::RetentionConfig;
use crate::errors::IndexerError;
use crate::models::watermarks::{PrunableWatermark, StoredWatermark};
use crate::models::watermarks::PrunableWatermark;
use crate::store::pg_partition_manager::PgPartitionManager;
use crate::store::PgIndexerStore;
use crate::{metrics::IndexerMetrics, store::IndexerStore, types::IndexerResult};
Expand All @@ -25,8 +25,10 @@ pub struct Pruner {
pub metrics: IndexerMetrics,
}

/// Enum representing tables that the pruner is allowed to prune. The pruner will ignore any table
/// that is not listed here.
/// Enum representing tables that the pruner is allowed to prune. This corresponds to table names in
/// the database, and should be used in lieu of string literals. This enum is also meant to
/// facilitate the process of determining which unit (epoch, cp, or tx) should be used for the
/// table's range. Pruner will ignore any table that is not listed here.
#[derive(
Debug,
Eq,
Expand Down Expand Up @@ -243,8 +245,8 @@ async fn update_watermarks_lower_bounds_task(
}
}

/// Fetches all entries from the `watermarks` table, and updates the lower bounds for all watermarks
/// if the entry's epoch range exceeds the respective retention policy.
/// Fetches all entries from the `watermarks` table, and updates the `reader_lo` for each entry if
/// its epoch range exceeds the respective retention policy.
async fn update_watermarks_lower_bounds(
store: &PgIndexerStore,
retention_policies: &HashMap<PrunableTable, u64>,
Expand All @@ -267,22 +269,12 @@ async fn update_watermarks_lower_bounds(
continue;
};

if watermark.epoch_lo + epochs_to_keep <= watermark.epoch_hi {
let new_inclusive_epoch_lower_bound =
watermark.epoch_hi.saturating_sub(epochs_to_keep - 1);
if watermark.epoch_lo + epochs_to_keep <= watermark.epoch_hi_inclusive {
let new_epoch_lower_bound = watermark
.epoch_hi_inclusive
.saturating_sub(epochs_to_keep - 1);

// TODO: (wlmyng) now that epochs table is not pruned, we can add `first_tx_seq_num` or
// something and use it as a lookup table.
let (min_cp, _) = store
.get_checkpoint_range_for_epoch(new_inclusive_epoch_lower_bound)
.await?;
let (min_tx, _) = store.get_transaction_range_for_checkpoint(min_cp).await?;

lower_bound_updates.push(StoredWatermark::from_lower_bound_update(
watermark.entity.as_ref(),
new_inclusive_epoch_lower_bound,
watermark.entity.select_lower_bound(min_cp, min_tx),
))
lower_bound_updates.push((watermark.entity, new_epoch_lower_bound));
}
}

Expand Down
32 changes: 16 additions & 16 deletions crates/sui-indexer/src/models/watermarks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@ pub struct StoredWatermark {
pub entity: String,
/// Inclusive upper epoch bound for this entity's data. Committer updates this field. Pruner uses
/// this to determine if pruning is necessary based on the retention policy.
pub epoch_hi: i64,
pub epoch_hi_inclusive: i64,
/// Inclusive lower epoch bound for this entity's data. Pruner updates this field when the epoch range exceeds the retention policy.
pub epoch_lo: i64,
/// Inclusive upper checkpoint bound for this entity's data. Committer updates this field. All
/// data of this entity in the checkpoint must be persisted before advancing this watermark. The
/// committer refers to this on disaster recovery to resume writing.
pub checkpoint_hi: i64,
pub checkpoint_hi_inclusive: i64,
/// Inclusive upper transaction sequence number bound for this entity's data. Committer updates
/// this field.
pub tx_hi: i64,
pub tx_hi_inclusive: i64,
/// Inclusive low watermark that the pruner advances. Corresponds to the epoch id, checkpoint
/// sequence number, or tx sequence number depending on the entity. Data before this watermark is
/// considered pruned by a reader. The underlying data may still exist in the db instance.
Expand All @@ -39,23 +39,23 @@ pub struct StoredWatermark {
/// Column used by the pruner to track its true progress. Data at and below this watermark has
/// been truly pruned from the db, and should no longer exist. When recovering from a crash, the
/// pruner will consult this column to determine where to continue.
pub pruned_lo: Option<i64>,
pub pruner_lo: Option<i64>,
}

#[derive(Debug)]
pub struct PrunableWatermark {
pub entity: PrunableTable,
pub epoch_hi: u64,
pub epoch_hi_inclusive: u64,
pub epoch_lo: u64,
pub checkpoint_hi: u64,
pub tx_hi: u64,
pub checkpoint_hi_inclusive: u64,
pub tx_hi_inclusive: u64,
pub reader_lo: u64,
/// Timestamp when the watermark's lower bound was last updated.
pub timestamp_ms: i64,
/// Latest timestamp read from db.
pub current_timestamp_ms: i64,
/// Data at and below `pruned_lo` is considered pruned by the pruner.
pub pruned_lo: Option<u64>,
pub pruner_lo: Option<u64>,
}

impl PrunableWatermark {
Expand All @@ -66,31 +66,31 @@ impl PrunableWatermark {

Some(PrunableWatermark {
entity,
epoch_hi: stored.epoch_hi as u64,
epoch_hi_inclusive: stored.epoch_hi_inclusive as u64,
epoch_lo: stored.epoch_lo as u64,
checkpoint_hi: stored.checkpoint_hi as u64,
tx_hi: stored.tx_hi as u64,
checkpoint_hi_inclusive: stored.checkpoint_hi_inclusive as u64,
tx_hi_inclusive: stored.tx_hi_inclusive as u64,
reader_lo: stored.reader_lo as u64,
timestamp_ms: stored.timestamp_ms,
current_timestamp_ms: latest_db_timestamp,
pruned_lo: stored.pruned_lo.map(|lo| lo as u64),
pruner_lo: stored.pruner_lo.map(|lo| lo as u64),
})
}

/// Represents the first `unit` (checkpoint, tx, epoch) that has not yet been pruned. If
/// `pruned_lo` is not set in db, default to 0. Otherwise, this is `pruned_lo + `.
pub fn pruner_lo(&self) -> u64 {
self.pruned_lo.map_or(0, |lo| lo.saturating_add(1))
self.pruner_lo.map_or(0, |lo| lo.saturating_add(1))
}
}

impl StoredWatermark {
pub fn from_upper_bound_update(entity: &str, watermark: CommitterWatermark) -> Self {
StoredWatermark {
entity: entity.to_string(),
epoch_hi: watermark.epoch as i64,
checkpoint_hi: watermark.cp as i64,
tx_hi: watermark.tx as i64,
epoch_hi_inclusive: watermark.epoch as i64,
checkpoint_hi_inclusive: watermark.cp as i64,
tx_hi_inclusive: watermark.tx as i64,
..StoredWatermark::default()
}
}
Expand Down
8 changes: 4 additions & 4 deletions crates/sui-indexer/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,13 +371,13 @@ diesel::table! {
diesel::table! {
watermarks (entity) {
entity -> Text,
epoch_hi -> Int8,
epoch_hi_inclusive -> Int8,
epoch_lo -> Int8,
checkpoint_hi -> Int8,
tx_hi -> Int8,
checkpoint_hi_inclusive -> Int8,
tx_hi_inclusive -> Int8,
reader_lo -> Int8,
timestamp_ms -> Int8,
pruned_lo -> Nullable<Int8>,
pruner_lo -> Nullable<Int8>,
}
}

Expand Down
5 changes: 4 additions & 1 deletion crates/sui-indexer/src/store/indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use async_trait::async_trait;
use strum::IntoEnumIterator;

use crate::errors::IndexerError;
use crate::handlers::pruner::PrunableTable;
use crate::handlers::{CommitterWatermark, EpochToCommit, TransactionObjectChangesToCommit};
use crate::models::display::StoredDisplay;
use crate::models::obj_indices::StoredObjectVersion;
Expand Down Expand Up @@ -125,9 +126,11 @@ pub trait IndexerStore: Clone + Sync + Send + 'static {
where
E::Iterator: Iterator<Item: AsRef<str>>;

/// Updates each watermark entry's lower bounds per the list of tables and their new epoch lower
/// bounds.
async fn update_watermarks_lower_bound(
&self,
watermarks: Vec<StoredWatermark>,
watermarks: Vec<(PrunableTable, u64)>,
) -> Result<(), IndexerError>;

/// Load all watermark entries from the store, and the latest timestamp from the db.
Expand Down
68 changes: 61 additions & 7 deletions crates/sui-indexer/src/store/pg_indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use core::result::Result::Ok;
use csv::{ReaderBuilder, Writer};
use diesel::dsl::{max, min};
use diesel::ExpressionMethods;
use diesel::JoinOnDsl;
use diesel::OptionalExtension;
use diesel::QueryDsl;
use diesel_async::scoped_futures::ScopedFutureExt;
Expand All @@ -28,6 +29,7 @@ use sui_storage::object_store::util::put;
use crate::config::UploadOptions;
use crate::database::ConnectionPool;
use crate::errors::{Context, IndexerError};
use crate::handlers::pruner::PrunableTable;
use crate::handlers::TransactionObjectChangesToCommit;
use crate::handlers::{CommitterWatermark, EpochToCommit};
use crate::metrics::IndexerMetrics;
Expand Down Expand Up @@ -298,7 +300,7 @@ impl PgIndexerStore {
let mut connection = self.pool.get().await?;

watermarks::table
.select(watermarks::checkpoint_hi)
.select(watermarks::checkpoint_hi_inclusive)
.filter(watermarks::entity.eq("objects_snapshot"))
.first::<i64>(&mut connection)
.await
Expand Down Expand Up @@ -1553,9 +1555,10 @@ impl PgIndexerStore {
.on_conflict(watermarks::entity)
.do_update()
.set((
watermarks::epoch_hi.eq(excluded(watermarks::epoch_hi)),
watermarks::checkpoint_hi.eq(excluded(watermarks::checkpoint_hi)),
watermarks::tx_hi.eq(excluded(watermarks::tx_hi)),
watermarks::epoch_hi_inclusive.eq(excluded(watermarks::epoch_hi_inclusive)),
watermarks::checkpoint_hi_inclusive
.eq(excluded(watermarks::checkpoint_hi_inclusive)),
watermarks::tx_hi_inclusive.eq(excluded(watermarks::tx_hi_inclusive)),
))
.execute(conn)
.await
Expand All @@ -1576,19 +1579,70 @@ impl PgIndexerStore {
})
}

async fn map_epochs_to_cp_tx(
&self,
epochs: &[u64],
) -> Result<HashMap<u64, (u64, u64)>, IndexerError> {
use diesel_async::RunQueryDsl;

let mut connection = self.pool.get().await?;

// TODO: (wlmyng) - having `tx_sequence_number` on `epoch` would simplify this greatly and
// remove dependencies on pruned tables.
let results: Vec<(i64, i64, i64)> = epochs::table
.inner_join(pruner_cp_watermark::table.on(
epochs::first_checkpoint_id.eq(pruner_cp_watermark::checkpoint_sequence_number),
))
.filter(epochs::epoch.eq_any(epochs.iter().map(|&e| e as i64)))
.select((
epochs::epoch,
pruner_cp_watermark::checkpoint_sequence_number,
pruner_cp_watermark::min_tx_sequence_number,
))
.load::<(i64, i64, i64)>(&mut connection)
.await
.map_err(Into::into)
.context("Failed to fetch first checkpoint and tx seq num for epochs")?;

Ok(results
.into_iter()
.map(|(epoch, checkpoint, tx)| (epoch as u64, (checkpoint as u64, tx as u64)))
.collect())
}

async fn update_watermarks_lower_bound(
&self,
watermarks: Vec<StoredWatermark>,
watermarks: Vec<(PrunableTable, u64)>,
) -> Result<(), IndexerError> {
use diesel_async::RunQueryDsl;

let epochs: Vec<u64> = watermarks.iter().map(|(_table, epoch)| *epoch).collect();
let epoch_mapping = self.map_epochs_to_cp_tx(&epochs).await?;
let lookups: Result<Vec<StoredWatermark>, IndexerError> = watermarks
.into_iter()
.map(|(table, epoch)| {
let (checkpoint, tx) = epoch_mapping.get(&epoch).ok_or_else(|| {
IndexerError::PersistentStorageDataCorruptionError(format!(
"Epoch {} not found in epoch mapping",
epoch
))
})?;

Ok(StoredWatermark::from_lower_bound_update(
table.as_ref(),
epoch,
table.select_lower_bound(*checkpoint, *tx),
))
})
.collect();
let lower_bound_updates = lookups?;

let guard = self
.metrics
.checkpoint_db_commit_latency_watermarks
.start_timer();

transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
let lower_bound_updates = watermarks.clone();
async {
use diesel::dsl::sql;
use diesel::query_dsl::methods::FilterDsl;
Expand Down Expand Up @@ -2330,7 +2384,7 @@ impl IndexerStore for PgIndexerStore {

async fn update_watermarks_lower_bound(
&self,
watermarks: Vec<StoredWatermark>,
watermarks: Vec<(PrunableTable, u64)>,
) -> Result<(), IndexerError> {
self.update_watermarks_lower_bound(watermarks).await
}
Expand Down

0 comments on commit cedb9e7

Please sign in to comment.