Skip to content

Commit

Permalink
Update pruner to start a separate task that will update reader lower …
Browse files Browse the repository at this point in the history
…bound watermarks, without doing any actual pruning.
  • Loading branch information
wlmyng committed Oct 16, 2024
1 parent 31b15dd commit 856a957
Show file tree
Hide file tree
Showing 7 changed files with 408 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ CREATE TABLE watermarks
-- be dropped. The pruner uses this column to determine whether to prune or wait long enough
-- that all in-flight reads complete or timeout before it acts on an updated watermark.
timestamp_ms BIGINT NOT NULL,
-- Column used by the pruner to track its true progress. Data at and below this watermark can
-- be immediately pruned.
-- Updated and used by the pruner. Data up to and excluding this watermark can be immediately
-- dropped. Data between this and `reader_lo` can be pruned after a delay.
pruner_lo BIGINT,
PRIMARY KEY (entity)
);
3 changes: 2 additions & 1 deletion crates/sui-indexer/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ pub trait Handler<T>: Send + Sync {

/// The indexer writer operates on checkpoint data, which contains information on the current epoch,
/// checkpoint, and transaction. These three numbers form the watermark upper bound for each
/// committed table.
/// committed table. The reader and pruner are responsible for determining which of the three units
/// will be used for a particular table.
#[derive(Clone, Copy, Ord, PartialOrd, Eq, PartialEq)]
pub struct CommitterWatermark {
pub epoch: u64,
Expand Down
123 changes: 121 additions & 2 deletions crates/sui-indexer/src/handlers/pruner.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use mysten_metrics::spawn_monitored_task;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::Duration;
Expand All @@ -10,6 +11,7 @@ use tracing::{error, info};

use crate::config::RetentionConfig;
use crate::errors::IndexerError;
use crate::models::watermarks::PrunableWatermark;
use crate::store::pg_partition_manager::PgPartitionManager;
use crate::store::PgIndexerStore;
use crate::{metrics::IndexerMetrics, store::IndexerStore, types::IndexerResult};
Expand All @@ -23,8 +25,10 @@ pub struct Pruner {
pub metrics: IndexerMetrics,
}

/// Enum representing tables that the pruner is allowed to prune. The pruner will ignore any table
/// that is not listed here.
/// Enum representing tables that the pruner is allowed to prune. This corresponds to table names in
/// the database, and should be used in lieu of string literals. This enum is also meant to
/// facilitate the process of determining which unit (epoch, cp, or tx) should be used for the
/// table's range. Pruner will ignore any table that is not listed here.
#[derive(
Debug,
Eq,
Expand Down Expand Up @@ -69,6 +73,48 @@ pub enum PrunableTable {
PrunerCpWatermark,
}

impl PrunableTable {
pub fn select_reader_lo(&self, cp: u64, tx: u64) -> u64 {
match self {
PrunableTable::ObjectsHistory => cp,
PrunableTable::Transactions => tx,
PrunableTable::Events => tx,

PrunableTable::EventEmitPackage => tx,
PrunableTable::EventEmitModule => tx,
PrunableTable::EventSenders => tx,
PrunableTable::EventStructInstantiation => tx,
PrunableTable::EventStructModule => tx,
PrunableTable::EventStructName => tx,
PrunableTable::EventStructPackage => tx,

PrunableTable::TxAffectedAddresses => tx,
PrunableTable::TxAffectedObjects => tx,
PrunableTable::TxCallsPkg => tx,
PrunableTable::TxCallsMod => tx,
PrunableTable::TxCallsFun => tx,
PrunableTable::TxChangedObjects => tx,
PrunableTable::TxDigests => tx,
PrunableTable::TxInputObjects => tx,
PrunableTable::TxKinds => tx,
PrunableTable::TxRecipients => tx,
PrunableTable::TxSenders => tx,

PrunableTable::Checkpoints => cp,
PrunableTable::PrunerCpWatermark => cp,
}
}

pub fn select_pruner_lo(&self, epoch_lo: u64, reader_lo: u64) -> u64 {
match self {
PrunableTable::ObjectsHistory => epoch_lo,
PrunableTable::Transactions => epoch_lo,
PrunableTable::Events => epoch_lo,
_ => reader_lo,
}
}
}

impl Pruner {
/// Instantiates a pruner with default retention and overrides. Pruner will finalize the
/// retention policies so there is a value for every prunable table.
Expand Down Expand Up @@ -101,6 +147,15 @@ impl Pruner {
}

pub async fn start(&self, cancel: CancellationToken) -> IndexerResult<()> {
let store_clone = self.store.clone();
let retention_policies = self.retention_policies.clone();
let cancel_clone = cancel.clone();
spawn_monitored_task!(update_watermarks_lower_bounds_task(
store_clone,
retention_policies,
cancel_clone
));

let mut last_seen_max_epoch = 0;
// The first epoch that has not yet been pruned.
let mut next_prune_epoch = None;
Expand Down Expand Up @@ -177,3 +232,67 @@ impl Pruner {
Ok(())
}
}

/// Task to periodically query the `watermarks` table and update the lower bounds for all watermarks
/// if the entry exceeds epoch-level retention policy.
async fn update_watermarks_lower_bounds_task(
store: PgIndexerStore,
retention_policies: HashMap<PrunableTable, u64>,
cancel: CancellationToken,
) -> IndexerResult<()> {
let mut interval = tokio::time::interval(Duration::from_secs(5));
loop {
tokio::select! {
_ = cancel.cancelled() => {
info!("Pruner watermark lower bound update task cancelled.");
return Ok(());
}
_ = interval.tick() => {
update_watermarks_lower_bounds(&store, &retention_policies, &cancel).await?;
}
}
}
}

/// Fetches all entries from the `watermarks` table, and updates the `reader_lo` for each entry if
/// its epoch range exceeds the respective retention policy.
async fn update_watermarks_lower_bounds(
store: &PgIndexerStore,
retention_policies: &HashMap<PrunableTable, u64>,
cancel: &CancellationToken,
) -> IndexerResult<()> {
let (watermarks, latest_db_timestamp) = store.get_watermarks().await?;
let mut lower_bound_updates = vec![];

for watermark in watermarks.iter() {
if cancel.is_cancelled() {
info!("Pruner watermark lower bound update task cancelled.");
return Ok(());
}

let Some(watermark) = PrunableWatermark::new(watermark.clone(), latest_db_timestamp) else {
continue;
};

let Some(epochs_to_keep) = retention_policies.get(&watermark.entity) else {
continue;
};

if watermark.epoch_lo + epochs_to_keep <= watermark.epoch_hi_inclusive {
let new_epoch_lo = watermark
.epoch_hi_inclusive
.saturating_sub(epochs_to_keep - 1);

lower_bound_updates.push((watermark, new_epoch_lo));
}
}

if !lower_bound_updates.is_empty() {
store
.update_watermarks_lower_bound(lower_bound_updates)
.await?;
info!("Finished updating lower bounds for watermarks");
}

Ok(())
}
89 changes: 84 additions & 5 deletions crates/sui-indexer/src/models/watermarks.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::str::FromStr;

use diesel::prelude::*;

use crate::{
handlers::CommitterWatermark,
handlers::{pruner::PrunableTable, CommitterWatermark},
schema::watermarks::{self},
};
use diesel::prelude::*;

/// Represents a row in the `watermarks` table.
#[derive(Queryable, Insertable, Default, QueryableByName)]
#[derive(Queryable, Insertable, Default, QueryableByName, Clone)]
#[diesel(table_name = watermarks, primary_key(entity))]
pub struct StoredWatermark {
/// The table governed by this watermark, i.e `epochs`, `checkpoints`, `transactions`.
Expand All @@ -33,11 +36,72 @@ pub struct StoredWatermark {
/// be dropped. The pruner uses this column to determine whether to prune or wait long enough
/// that all in-flight reads complete or timeout before it acts on an updated watermark.
pub timestamp_ms: i64,
/// Column used by the pruner to track its true progress. Data at and below this watermark can
/// be immediately pruned.
/// Updated and used by the pruner. Data up to and excluding this watermark can be immediately
/// dropped. Data between this and `reader_lo` can be pruned after a delay.
pub pruner_lo: Option<i64>,
}

#[derive(Debug)]
pub struct PrunableWatermark {
pub entity: PrunableTable,
pub epoch_hi_inclusive: u64,
pub epoch_lo: u64,
pub checkpoint_hi_inclusive: u64,
pub tx_hi_inclusive: u64,
pub reader_lo: u64,
/// Timestamp when the watermark's lower bound was last updated.
pub timestamp_ms: i64,
/// Latest timestamp read from db.
pub current_timestamp_ms: i64,
/// Data at and below `pruned_lo` is considered pruned by the pruner.
pub pruner_lo: Option<u64>,
}

impl PrunableWatermark {
pub fn new(stored: StoredWatermark, latest_db_timestamp: i64) -> Option<Self> {
let entity = PrunableTable::from_str(&stored.entity).ok()?;

Some(PrunableWatermark {
entity,
epoch_hi_inclusive: stored.epoch_hi_inclusive as u64,
epoch_lo: stored.epoch_lo as u64,
checkpoint_hi_inclusive: stored.checkpoint_hi_inclusive as u64,
tx_hi_inclusive: stored.tx_hi_inclusive as u64,
reader_lo: stored.reader_lo as u64,
timestamp_ms: stored.timestamp_ms,
current_timestamp_ms: latest_db_timestamp,
pruner_lo: stored.pruner_lo.map(|lo| lo as u64),
})
}

pub fn update(&mut self, new_epoch_lo: u64, new_reader_lo: u64) {
self.pruner_lo = Some(match self.entity {
PrunableTable::ObjectsHistory => self.epoch_lo,
PrunableTable::Transactions => self.epoch_lo,
PrunableTable::Events => self.epoch_lo,
_ => self.reader_lo,
});

self.epoch_lo = new_epoch_lo;
self.reader_lo = new_reader_lo;
}

/// Represents the exclusive upper bound of data that can be pruned immediately.
pub fn immediately_prunable_hi(&self) -> Option<u64> {
self.pruner_lo
}

/// Represents the lower bound of data that can be pruned after a delay.
pub fn delayed_prunable_lo(&self) -> Option<u64> {
self.pruner_lo
}

/// The new `pruner_lo` is the current reader_lo, or epoch_lo for epoch-partitioned tables.
pub fn new_pruner_lo(&self) -> u64 {
self.entity.select_pruner_lo(self.epoch_lo, self.reader_lo)
}
}

impl StoredWatermark {
pub fn from_upper_bound_update(entity: &str, watermark: CommitterWatermark) -> Self {
StoredWatermark {
Expand All @@ -48,4 +112,19 @@ impl StoredWatermark {
..StoredWatermark::default()
}
}

pub fn from_lower_bound_update(
entity: &str,
epoch_lo: u64,
reader_lo: u64,
pruner_lo: u64,
) -> Self {
StoredWatermark {
entity: entity.to_string(),
epoch_lo: epoch_lo as i64,
reader_lo: reader_lo as i64,
pruner_lo: Some(pruner_lo as i64),
..StoredWatermark::default()
}
}
}
11 changes: 11 additions & 0 deletions crates/sui-indexer/src/store/indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::models::display::StoredDisplay;
use crate::models::obj_indices::StoredObjectVersion;
use crate::models::objects::{StoredDeletedObject, StoredObject};
use crate::models::raw_checkpoints::StoredRawCheckpoint;
use crate::models::watermarks::{PrunableWatermark, StoredWatermark};
use crate::types::{
EventIndex, IndexedCheckpoint, IndexedEvent, IndexedPackage, IndexedTransaction, TxIndex,
};
Expand Down Expand Up @@ -125,4 +126,14 @@ pub trait IndexerStore: Clone + Sync + Send + 'static {
) -> Result<(), IndexerError>
where
E::Iterator: Iterator<Item: AsRef<str>>;

/// Updates each watermark entry's lower bounds per the list of tables and their new epoch lower
/// bounds.
async fn update_watermarks_lower_bound(
&self,
watermarks: Vec<(PrunableWatermark, u64)>,
) -> Result<(), IndexerError>;

/// Load all watermark entries from the store, and the latest timestamp from the db.
async fn get_watermarks(&self) -> Result<(Vec<StoredWatermark>, i64), IndexerError>;
}
39 changes: 39 additions & 0 deletions crates/sui-indexer/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,42 @@ where
})
.await
}

pub async fn read_with_retry<'a, Q, T>(
pool: &ConnectionPool,
timeout: Duration,
query: Q,
) -> Result<T, IndexerError>
where
Q: for<'r> FnOnce(
&'r mut AsyncPgConnection,
) -> ScopedBoxFuture<'a, 'r, Result<T, IndexerError>>
+ Send,
Q: Clone,
T: 'a,
{
let backoff = backoff::ExponentialBackoff {
max_elapsed_time: Some(timeout),
..Default::default()
};
backoff::future::retry(backoff, || async {
let mut connection = pool.get().await.map_err(|e| backoff::Error::Transient {
err: IndexerError::PostgresWriteError(e.to_string()),
retry_after: None,
})?;

connection
.build_transaction()
.read_only()
.run(query.clone())
.await
.map_err(|e| {
tracing::error!("Error with reading data from DB: {:?}, retrying...", e);
backoff::Error::Transient {
err: IndexerError::PostgresWriteError(e.to_string()),
retry_after: None,
}
})
})
.await
}
Loading

0 comments on commit 856a957

Please sign in to comment.