diff --git a/crates/sui-indexer/migrations/pg/2024-09-12-213234_watermarks/up.sql b/crates/sui-indexer/migrations/pg/2024-09-12-213234_watermarks/up.sql index e8499c692120c7..73d7a4dcf16f52 100644 --- a/crates/sui-indexer/migrations/pg/2024-09-12-213234_watermarks/up.sql +++ b/crates/sui-indexer/migrations/pg/2024-09-12-213234_watermarks/up.sql @@ -22,8 +22,8 @@ CREATE TABLE watermarks -- be dropped. The pruner uses this column to determine whether to prune or wait long enough -- that all in-flight reads complete or timeout before it acts on an updated watermark. timestamp_ms BIGINT NOT NULL, - -- Column used by the pruner to track its true progress. Data at and below this watermark can - -- be immediately pruned. + -- Updated and used by the pruner. Data up to and excluding this watermark can be immediately + -- dropped. Data between this and `reader_lo` can be pruned after a delay. pruner_lo BIGINT, PRIMARY KEY (entity) ); diff --git a/crates/sui-indexer/src/handlers/mod.rs b/crates/sui-indexer/src/handlers/mod.rs index eb99d56abe36b5..e9984cb1064b8d 100644 --- a/crates/sui-indexer/src/handlers/mod.rs +++ b/crates/sui-indexer/src/handlers/mod.rs @@ -187,7 +187,8 @@ pub trait Handler: Send + Sync { /// The indexer writer operates on checkpoint data, which contains information on the current epoch, /// checkpoint, and transaction. These three numbers form the watermark upper bound for each -/// committed table. +/// committed table. The reader and pruner are responsible for determining which of the three units +/// will be used for a particular table. #[derive(Clone, Copy, Ord, PartialOrd, Eq, PartialEq)] pub struct CommitterWatermark { pub epoch: u64, diff --git a/crates/sui-indexer/src/handlers/pruner.rs b/crates/sui-indexer/src/handlers/pruner.rs index 12f1bca4ebfd91..d298962af93cd9 100644 --- a/crates/sui-indexer/src/handlers/pruner.rs +++ b/crates/sui-indexer/src/handlers/pruner.rs @@ -1,6 +1,7 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use mysten_metrics::spawn_monitored_task; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::time::Duration; @@ -10,6 +11,7 @@ use tracing::{error, info}; use crate::config::RetentionConfig; use crate::errors::IndexerError; +use crate::models::watermarks::PrunableWatermark; use crate::store::pg_partition_manager::PgPartitionManager; use crate::store::PgIndexerStore; use crate::{metrics::IndexerMetrics, store::IndexerStore, types::IndexerResult}; @@ -23,8 +25,10 @@ pub struct Pruner { pub metrics: IndexerMetrics, } -/// Enum representing tables that the pruner is allowed to prune. The pruner will ignore any table -/// that is not listed here. +/// Enum representing tables that the pruner is allowed to prune. This corresponds to table names in +/// the database, and should be used in lieu of string literals. This enum is also meant to +/// facilitate the process of determining which unit (epoch, cp, or tx) should be used for the +/// table's range. Pruner will ignore any table that is not listed here. #[derive( Debug, Eq, @@ -69,6 +73,48 @@ pub enum PrunableTable { PrunerCpWatermark, } +impl PrunableTable { + pub fn select_reader_lo(&self, cp: u64, tx: u64) -> u64 { + match self { + PrunableTable::ObjectsHistory => cp, + PrunableTable::Transactions => tx, + PrunableTable::Events => tx, + + PrunableTable::EventEmitPackage => tx, + PrunableTable::EventEmitModule => tx, + PrunableTable::EventSenders => tx, + PrunableTable::EventStructInstantiation => tx, + PrunableTable::EventStructModule => tx, + PrunableTable::EventStructName => tx, + PrunableTable::EventStructPackage => tx, + + PrunableTable::TxAffectedAddresses => tx, + PrunableTable::TxAffectedObjects => tx, + PrunableTable::TxCallsPkg => tx, + PrunableTable::TxCallsMod => tx, + PrunableTable::TxCallsFun => tx, + PrunableTable::TxChangedObjects => tx, + PrunableTable::TxDigests => tx, + PrunableTable::TxInputObjects => tx, + PrunableTable::TxKinds => tx, + PrunableTable::TxRecipients => tx, + PrunableTable::TxSenders => tx, + + PrunableTable::Checkpoints => cp, + PrunableTable::PrunerCpWatermark => cp, + } + } + + pub fn select_pruner_lo(&self, epoch_lo: u64, reader_lo: u64) -> u64 { + match self { + PrunableTable::ObjectsHistory => epoch_lo, + PrunableTable::Transactions => epoch_lo, + PrunableTable::Events => epoch_lo, + _ => reader_lo, + } + } +} + impl Pruner { /// Instantiates a pruner with default retention and overrides. Pruner will finalize the /// retention policies so there is a value for every prunable table. @@ -101,6 +147,15 @@ impl Pruner { } pub async fn start(&self, cancel: CancellationToken) -> IndexerResult<()> { + let store_clone = self.store.clone(); + let retention_policies = self.retention_policies.clone(); + let cancel_clone = cancel.clone(); + spawn_monitored_task!(update_watermarks_lower_bounds_task( + store_clone, + retention_policies, + cancel_clone + )); + let mut last_seen_max_epoch = 0; // The first epoch that has not yet been pruned. let mut next_prune_epoch = None; @@ -177,3 +232,67 @@ impl Pruner { Ok(()) } } + +/// Task to periodically query the `watermarks` table and update the lower bounds for all watermarks +/// if the entry exceeds epoch-level retention policy. +async fn update_watermarks_lower_bounds_task( + store: PgIndexerStore, + retention_policies: HashMap, + cancel: CancellationToken, +) -> IndexerResult<()> { + let mut interval = tokio::time::interval(Duration::from_secs(5)); + loop { + tokio::select! { + _ = cancel.cancelled() => { + info!("Pruner watermark lower bound update task cancelled."); + return Ok(()); + } + _ = interval.tick() => { + update_watermarks_lower_bounds(&store, &retention_policies, &cancel).await?; + } + } + } +} + +/// Fetches all entries from the `watermarks` table, and updates the `reader_lo` for each entry if +/// its epoch range exceeds the respective retention policy. +async fn update_watermarks_lower_bounds( + store: &PgIndexerStore, + retention_policies: &HashMap, + cancel: &CancellationToken, +) -> IndexerResult<()> { + let (watermarks, latest_db_timestamp) = store.get_watermarks().await?; + let mut lower_bound_updates = vec![]; + + for watermark in watermarks.iter() { + if cancel.is_cancelled() { + info!("Pruner watermark lower bound update task cancelled."); + return Ok(()); + } + + let Some(watermark) = PrunableWatermark::new(watermark.clone(), latest_db_timestamp) else { + continue; + }; + + let Some(epochs_to_keep) = retention_policies.get(&watermark.entity) else { + continue; + }; + + if watermark.epoch_lo + epochs_to_keep <= watermark.epoch_hi_inclusive { + let new_epoch_lo = watermark + .epoch_hi_inclusive + .saturating_sub(epochs_to_keep - 1); + + lower_bound_updates.push((watermark, new_epoch_lo)); + } + } + + if !lower_bound_updates.is_empty() { + store + .update_watermarks_lower_bound(lower_bound_updates) + .await?; + info!("Finished updating lower bounds for watermarks"); + } + + Ok(()) +} diff --git a/crates/sui-indexer/src/models/watermarks.rs b/crates/sui-indexer/src/models/watermarks.rs index e3c2395fec6369..06734371d15dcd 100644 --- a/crates/sui-indexer/src/models/watermarks.rs +++ b/crates/sui-indexer/src/models/watermarks.rs @@ -1,14 +1,17 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use std::str::FromStr; + +use diesel::prelude::*; + use crate::{ - handlers::CommitterWatermark, + handlers::{pruner::PrunableTable, CommitterWatermark}, schema::watermarks::{self}, }; -use diesel::prelude::*; /// Represents a row in the `watermarks` table. -#[derive(Queryable, Insertable, Default, QueryableByName)] +#[derive(Queryable, Insertable, Default, QueryableByName, Clone)] #[diesel(table_name = watermarks, primary_key(entity))] pub struct StoredWatermark { /// The table governed by this watermark, i.e `epochs`, `checkpoints`, `transactions`. @@ -33,11 +36,72 @@ pub struct StoredWatermark { /// be dropped. The pruner uses this column to determine whether to prune or wait long enough /// that all in-flight reads complete or timeout before it acts on an updated watermark. pub timestamp_ms: i64, - /// Column used by the pruner to track its true progress. Data at and below this watermark can - /// be immediately pruned. + /// Updated and used by the pruner. Data up to and excluding this watermark can be immediately + /// dropped. Data between this and `reader_lo` can be pruned after a delay. pub pruner_lo: Option, } +#[derive(Debug)] +pub struct PrunableWatermark { + pub entity: PrunableTable, + pub epoch_hi_inclusive: u64, + pub epoch_lo: u64, + pub checkpoint_hi_inclusive: u64, + pub tx_hi_inclusive: u64, + pub reader_lo: u64, + /// Timestamp when the watermark's lower bound was last updated. + pub timestamp_ms: i64, + /// Latest timestamp read from db. + pub current_timestamp_ms: i64, + /// Data at and below `pruned_lo` is considered pruned by the pruner. + pub pruner_lo: Option, +} + +impl PrunableWatermark { + pub fn new(stored: StoredWatermark, latest_db_timestamp: i64) -> Option { + let entity = PrunableTable::from_str(&stored.entity).ok()?; + + Some(PrunableWatermark { + entity, + epoch_hi_inclusive: stored.epoch_hi_inclusive as u64, + epoch_lo: stored.epoch_lo as u64, + checkpoint_hi_inclusive: stored.checkpoint_hi_inclusive as u64, + tx_hi_inclusive: stored.tx_hi_inclusive as u64, + reader_lo: stored.reader_lo as u64, + timestamp_ms: stored.timestamp_ms, + current_timestamp_ms: latest_db_timestamp, + pruner_lo: stored.pruner_lo.map(|lo| lo as u64), + }) + } + + pub fn update(&mut self, new_epoch_lo: u64, new_reader_lo: u64) { + self.pruner_lo = Some(match self.entity { + PrunableTable::ObjectsHistory => self.epoch_lo, + PrunableTable::Transactions => self.epoch_lo, + PrunableTable::Events => self.epoch_lo, + _ => self.reader_lo, + }); + + self.epoch_lo = new_epoch_lo; + self.reader_lo = new_reader_lo; + } + + /// Represents the exclusive upper bound of data that can be pruned immediately. + pub fn immediately_prunable_hi(&self) -> Option { + self.pruner_lo + } + + /// Represents the lower bound of data that can be pruned after a delay. + pub fn delayed_prunable_lo(&self) -> Option { + self.pruner_lo + } + + /// The new `pruner_lo` is the current reader_lo, or epoch_lo for epoch-partitioned tables. + pub fn new_pruner_lo(&self) -> u64 { + self.entity.select_pruner_lo(self.epoch_lo, self.reader_lo) + } +} + impl StoredWatermark { pub fn from_upper_bound_update(entity: &str, watermark: CommitterWatermark) -> Self { StoredWatermark { @@ -48,4 +112,19 @@ impl StoredWatermark { ..StoredWatermark::default() } } + + pub fn from_lower_bound_update( + entity: &str, + epoch_lo: u64, + reader_lo: u64, + pruner_lo: u64, + ) -> Self { + StoredWatermark { + entity: entity.to_string(), + epoch_lo: epoch_lo as i64, + reader_lo: reader_lo as i64, + pruner_lo: Some(pruner_lo as i64), + ..StoredWatermark::default() + } + } } diff --git a/crates/sui-indexer/src/store/indexer_store.rs b/crates/sui-indexer/src/store/indexer_store.rs index 80c645951308ed..682a0a2fc84680 100644 --- a/crates/sui-indexer/src/store/indexer_store.rs +++ b/crates/sui-indexer/src/store/indexer_store.rs @@ -12,6 +12,7 @@ use crate::models::display::StoredDisplay; use crate::models::obj_indices::StoredObjectVersion; use crate::models::objects::{StoredDeletedObject, StoredObject}; use crate::models::raw_checkpoints::StoredRawCheckpoint; +use crate::models::watermarks::{PrunableWatermark, StoredWatermark}; use crate::types::{ EventIndex, IndexedCheckpoint, IndexedEvent, IndexedPackage, IndexedTransaction, TxIndex, }; @@ -125,4 +126,14 @@ pub trait IndexerStore: Clone + Sync + Send + 'static { ) -> Result<(), IndexerError> where E::Iterator: Iterator>; + + /// Updates each watermark entry's lower bounds per the list of tables and their new epoch lower + /// bounds. + async fn update_watermarks_lower_bound( + &self, + watermarks: Vec<(PrunableWatermark, u64)>, + ) -> Result<(), IndexerError>; + + /// Load all watermark entries from the store, and the latest timestamp from the db. + async fn get_watermarks(&self) -> Result<(Vec, i64), IndexerError>; } diff --git a/crates/sui-indexer/src/store/mod.rs b/crates/sui-indexer/src/store/mod.rs index 7c1aa9abcc107e..9d6bf65cc26b48 100644 --- a/crates/sui-indexer/src/store/mod.rs +++ b/crates/sui-indexer/src/store/mod.rs @@ -52,3 +52,42 @@ where }) .await } + +pub async fn read_with_retry<'a, Q, T>( + pool: &ConnectionPool, + timeout: Duration, + query: Q, +) -> Result +where + Q: for<'r> FnOnce( + &'r mut AsyncPgConnection, + ) -> ScopedBoxFuture<'a, 'r, Result> + + Send, + Q: Clone, + T: 'a, +{ + let backoff = backoff::ExponentialBackoff { + max_elapsed_time: Some(timeout), + ..Default::default() + }; + backoff::future::retry(backoff, || async { + let mut connection = pool.get().await.map_err(|e| backoff::Error::Transient { + err: IndexerError::PostgresWriteError(e.to_string()), + retry_after: None, + })?; + + connection + .build_transaction() + .read_only() + .run(query.clone()) + .await + .map_err(|e| { + tracing::error!("Error with reading data from DB: {:?}, retrying...", e); + backoff::Error::Transient { + err: IndexerError::PostgresWriteError(e.to_string()), + retry_after: None, + } + }) + }) + .await +} diff --git a/crates/sui-indexer/src/store/pg_indexer_store.rs b/crates/sui-indexer/src/store/pg_indexer_store.rs index 0750b658067071..bef9001dda9cdb 100644 --- a/crates/sui-indexer/src/store/pg_indexer_store.rs +++ b/crates/sui-indexer/src/store/pg_indexer_store.rs @@ -45,7 +45,7 @@ use crate::models::objects::{ }; use crate::models::packages::StoredPackage; use crate::models::transactions::StoredTransaction; -use crate::models::watermarks::StoredWatermark; +use crate::models::watermarks::{PrunableWatermark, StoredWatermark}; use crate::schema::{ chain_identifier, checkpoints, display, epochs, event_emit_module, event_emit_package, event_senders, event_struct_instantiation, event_struct_module, event_struct_name, @@ -55,7 +55,7 @@ use crate::schema::{ tx_calls_mod, tx_calls_pkg, tx_changed_objects, tx_digests, tx_input_objects, tx_kinds, watermarks, }; -use crate::store::transaction_with_retry; +use crate::store::{read_with_retry, transaction_with_retry}; use crate::types::{EventIndex, IndexedDeletedObject, IndexedObject}; use crate::types::{IndexedCheckpoint, IndexedEvent, IndexedPackage, IndexedTransaction, TxIndex}; @@ -251,7 +251,7 @@ impl PgIndexerStore { .context("Failed reading min prunable checkpoint sequence number from PostgresDB") } - async fn get_checkpoint_range_for_epoch( + pub async fn get_checkpoint_range_for_epoch( &self, epoch: u64, ) -> Result<(u64, Option), IndexerError> { @@ -269,7 +269,7 @@ impl PgIndexerStore { .context("Failed reading checkpoint range from PostgresDB") } - async fn get_transaction_range_for_checkpoint( + pub async fn get_transaction_range_for_checkpoint( &self, checkpoint: u64, ) -> Result<(u64, u64), IndexerError> { @@ -1550,6 +1550,140 @@ impl PgIndexerStore { tracing::error!("Failed to persist watermarks with error: {}", e); }) } + + async fn map_epochs_to_cp_tx( + &self, + epochs: &[u64], + ) -> Result, IndexerError> { + use diesel_async::RunQueryDsl; + + let mut connection = self.pool.get().await?; + + let results: Vec<(i64, i64, Option)> = epochs::table + .filter(epochs::epoch.eq_any(epochs.iter().map(|&e| e as i64))) + .select(( + epochs::epoch, + epochs::first_checkpoint_id, + epochs::first_tx_sequence_number, + )) + .load::<(i64, i64, Option)>(&mut connection) + .await + .map_err(Into::into) + .context("Failed to fetch first checkpoint and tx seq num for epochs")?; + + Ok(results + .into_iter() + .map(|(epoch, checkpoint, tx)| { + ( + epoch as u64, + (checkpoint as u64, tx.unwrap_or_default() as u64), + ) + }) + .collect()) + } + + async fn update_watermarks_lower_bound( + &self, + watermarks: Vec<(PrunableWatermark, u64)>, + ) -> Result<(), IndexerError> { + use diesel_async::RunQueryDsl; + + let epochs: Vec = watermarks.iter().map(|(_table, epoch)| *epoch).collect(); + let epoch_mapping = self.map_epochs_to_cp_tx(&epochs).await?; + let lookups: Result, IndexerError> = watermarks + .into_iter() + .map(|(watermark, epoch)| { + let (checkpoint, tx) = epoch_mapping.get(&epoch).ok_or_else(|| { + IndexerError::PersistentStorageDataCorruptionError(format!( + "Epoch {} not found in epoch mapping", + epoch + )) + })?; + + Ok(StoredWatermark::from_lower_bound_update( + watermark.entity.as_ref(), + epoch, + watermark.entity.select_reader_lo(*checkpoint, *tx), + watermark.new_pruner_lo(), + )) + }) + .collect(); + let lower_bound_updates = lookups?; + + let guard = self + .metrics + .checkpoint_db_commit_latency_watermarks + .start_timer(); + + transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| { + async { + use diesel::dsl::sql; + use diesel::query_dsl::methods::FilterDsl; + + diesel::insert_into(watermarks::table) + .values(lower_bound_updates) + .on_conflict(watermarks::entity) + .do_update() + .set(( + watermarks::reader_lo.eq(excluded(watermarks::reader_lo)), + watermarks::epoch_lo.eq(excluded(watermarks::epoch_lo)), + watermarks::pruner_lo.eq(excluded(watermarks::pruner_lo)), + watermarks::timestamp_ms.eq(sql::( + "(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000)::bigint", + )), + )) + .filter(excluded(watermarks::reader_lo).gt(watermarks::reader_lo)) + .filter(excluded(watermarks::epoch_lo).gt(watermarks::epoch_lo)) + .filter( + diesel::dsl::sql::( + "(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000)::bigint", + ) + .gt(watermarks::timestamp_ms), + ) + .execute(conn) + .await?; + + Ok::<(), IndexerError>(()) + } + .scope_boxed() + }) + .await + .tap_ok(|_| { + let elapsed = guard.stop_and_record(); + info!(elapsed, "Persisted watermarks"); + }) + .tap_err(|e| { + tracing::error!("Failed to persist watermarks with error: {}", e); + }) + } + + async fn get_watermarks(&self) -> Result<(Vec, i64), IndexerError> { + use diesel_async::RunQueryDsl; + + // read_only transaction, otherwise this will block and get blocked by write transactions to + // the same table. + read_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| { + async { + let stored = watermarks::table + .load::(conn) + .await + .map_err(Into::into) + .context("Failed reading watermarks from PostgresDB")?; + + let timestamp = diesel::select(diesel::dsl::sql::( + "(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000)::bigint", + )) + .get_result(conn) + .await + .map_err(Into::into) + .context("Failed reading current timestamp from PostgresDB")?; + + Ok((stored, timestamp)) + } + .scope_boxed() + }) + .await + } } #[async_trait] @@ -2250,6 +2384,17 @@ impl IndexerStore for PgIndexerStore { { self.update_watermarks_upper_bound::(watermark).await } + + async fn update_watermarks_lower_bound( + &self, + watermarks: Vec<(PrunableWatermark, u64)>, + ) -> Result<(), IndexerError> { + self.update_watermarks_lower_bound(watermarks).await + } + + async fn get_watermarks(&self) -> Result<(Vec, i64), IndexerError> { + self.get_watermarks().await + } } fn make_objects_history_to_commit(