Skip to content

Commit

Permalink
it runs
Browse files Browse the repository at this point in the history
  • Loading branch information
wlmyng committed Oct 2, 2024
1 parent 09e384e commit 5a9a066
Show file tree
Hide file tree
Showing 5 changed files with 290 additions and 6 deletions.
79 changes: 79 additions & 0 deletions crates/sui-indexer/src/handlers/pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@

use crate::config::RetentionPolicies;
use crate::errors::IndexerError;
use crate::models::watermarks::StoredWatermark;
use crate::store::pg_partition_manager::PgPartitionManager;
use crate::store::PgIndexerStore;
use crate::{metrics::IndexerMetrics, store::IndexerStore, types::IndexerResult};
use mysten_metrics::spawn_monitored_task;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::Duration;
Expand Down Expand Up @@ -146,6 +148,15 @@ impl Pruner {
}

pub async fn start(&self, cancel: CancellationToken) -> IndexerResult<()> {
let store_clone = self.store.clone();
let retention_policies = self.retention_policies.clone();
let cancel_clone = cancel.clone();
spawn_monitored_task!(update_watermarks_lower_bounds_task(
store_clone,
retention_policies,
cancel_clone
));

let mut last_seen_max_epoch = 0;
// The first epoch that has not yet been pruned.
let mut next_prune_epoch = None;
Expand Down Expand Up @@ -222,3 +233,71 @@ impl Pruner {
Ok(())
}
}

/// Task to periodically query the `watermarks` table and update the lower bounds for all watermarks
/// if the entry exceeds epoch-level retention policy.
async fn update_watermarks_lower_bounds_task(
store: PgIndexerStore,
retention_policies: HashMap<PrunableTable, u64>,
cancel: CancellationToken,
) -> IndexerResult<()> {
let mut interval = tokio::time::interval(Duration::from_secs(5));
loop {
tokio::select! {
_ = cancel.cancelled() => {
info!("Pruner watermark lower bound update task cancelled.");
return Ok(());
}
_ = interval.tick() => {
update_watermarks_lower_bounds(&store, &retention_policies, &cancel).await?;
}
}
}
}

/// Fetches all entries from the `watermarks` table, and updates the lower bounds for all watermarks
/// if the entry's epoch range exceeds the respective retention policy.
async fn update_watermarks_lower_bounds(
store: &PgIndexerStore,
retention_policies: &HashMap<PrunableTable, u64>,
cancel: &CancellationToken,
) -> IndexerResult<()> {
let watermarks = store.get_watermarks().await?;
let mut lower_bound_updates = vec![];

for watermark in watermarks.iter() {
if cancel.is_cancelled() {
info!("Pruner watermark lower bound update task cancelled.");
return Ok(());
}

let Some(epochs_to_keep) = retention_policies.get(&watermark.entity) else {
continue;
};

if watermark.epoch_lo + epochs_to_keep <= watermark.epoch_hi {
let new_inclusive_epoch_lower_bound =
watermark.epoch_hi.saturating_sub(epochs_to_keep - 1);

let (min_cp, _) = store
.get_checkpoint_range_for_epoch(new_inclusive_epoch_lower_bound)
.await?;
let (min_tx, _) = store.get_transaction_range_for_checkpoint(min_cp).await?;

lower_bound_updates.push(StoredWatermark::from_lower_bound_update(
watermark.entity.as_ref(),
new_inclusive_epoch_lower_bound,
watermark.entity.map_to_reader_unit(min_cp, min_tx),
))
}
}

if !lower_bound_updates.is_empty() {
store
.update_watermarks_lower_bound(lower_bound_updates)
.await?;
info!("Finished updating lower bounds for watermarks");
}

Ok(())
}
64 changes: 62 additions & 2 deletions crates/sui-indexer/src/models/watermarks.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::schema::watermarks::{self};
use std::str::FromStr;

use crate::{
errors::IndexerError,
handlers::pruner::PrunableTable,
schema::watermarks::{self},
};
use diesel::prelude::*;

/// Represents a row in the `watermarks` table.
#[derive(Queryable, Insertable, Default, QueryableByName)]
#[derive(Queryable, Insertable, Default, QueryableByName, Clone)]
#[diesel(table_name = watermarks, primary_key(entity))]
pub struct StoredWatermark {
/// The table name of group of tables governed by this watermark, i.e `epochs`, `checkpoints`,
Expand All @@ -31,6 +37,51 @@ pub struct StoredWatermark {
pub pruned_lo: Option<i64>,
}

#[derive(Debug)]
pub struct WatermarkRead {
pub entity: PrunableTable,
pub epoch_hi: u64,
pub epoch_lo: u64,
pub checkpoint_hi: u64,
pub reader_hi: u64,
pub reader_lo: u64,
/// Timestamp when the watermark's lower bound was last updated.
pub timestamp_ms: i64,
/// Latest timestamp read from db.
pub current_timestamp_ms: i64,
/// Data at and below `pruned_lo` is considered pruned by the pruner.
pub pruned_lo: Option<u64>,
}

impl WatermarkRead {
pub fn new(stored: StoredWatermark, latest_db_timestamp: i64) -> Result<Self, IndexerError> {
let entity = PrunableTable::from_str(&stored.entity).map_err(|e| {
IndexerError::PersistentStorageDataCorruptionError(format!(
"Unexpected entity in watermarks table: {}",
e
))
})?;

Ok(WatermarkRead {
entity,
epoch_hi: stored.epoch_hi as u64,
epoch_lo: stored.epoch_lo as u64,
checkpoint_hi: stored.checkpoint_hi as u64,
reader_hi: stored.reader_hi as u64,
reader_lo: stored.reader_lo as u64,
timestamp_ms: stored.timestamp_ms,
current_timestamp_ms: latest_db_timestamp,
pruned_lo: stored.pruned_lo.map(|lo| lo as u64),
})
}

/// Represents the first `unit` (checkpoint, tx, epoch) that has not yet been pruned. If
/// `pruned_lo` is not set in db, default to 0. Otherwise, this is `pruned_lo + `.
pub fn pruner_lo(&self) -> u64 {
self.pruned_lo.map_or(0, |lo| lo.saturating_add(1))
}
}

impl StoredWatermark {
pub fn from_upper_bound_update(
entity: &str,
Expand All @@ -46,4 +97,13 @@ impl StoredWatermark {
..StoredWatermark::default()
}
}

pub fn from_lower_bound_update(entity: &str, epoch_lo: u64, reader_lo: u64) -> Self {
StoredWatermark {
entity: entity.to_string(),
epoch_lo: epoch_lo as i64,
reader_lo: reader_lo as i64,
..StoredWatermark::default()
}
}
}
8 changes: 8 additions & 0 deletions crates/sui-indexer/src/store/indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::models::display::StoredDisplay;
use crate::models::obj_indices::StoredObjectVersion;
use crate::models::objects::{StoredDeletedObject, StoredObject};
use crate::models::raw_checkpoints::StoredRawCheckpoint;
use crate::models::watermarks::{StoredWatermark, WatermarkRead};
use crate::types::{
EventIndex, IndexedCheckpoint, IndexedEvent, IndexedPackage, IndexedTransaction, TxIndex,
};
Expand Down Expand Up @@ -123,4 +124,11 @@ pub trait IndexerStore: Clone + Sync + Send + 'static {
cp: u64,
tx: u64,
) -> Result<(), IndexerError>;

async fn update_watermarks_lower_bound(
&self,
watermarks: Vec<StoredWatermark>,
) -> Result<(), IndexerError>;

async fn get_watermarks(&self) -> Result<Vec<WatermarkRead>, IndexerError>;
}
39 changes: 39 additions & 0 deletions crates/sui-indexer/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,42 @@ where
})
.await
}

pub async fn read_with_retry<'a, Q, T>(
pool: &ConnectionPool,
timeout: Duration,
query: Q,
) -> Result<T, IndexerError>
where
Q: for<'r> FnOnce(
&'r mut AsyncPgConnection,
) -> ScopedBoxFuture<'a, 'r, Result<T, IndexerError>>
+ Send,
Q: Clone,
T: 'a,
{
let backoff = backoff::ExponentialBackoff {
max_elapsed_time: Some(timeout),
..Default::default()
};
backoff::future::retry(backoff, || async {
let mut connection = pool.get().await.map_err(|e| backoff::Error::Transient {
err: IndexerError::PostgresWriteError(e.to_string()),
retry_after: None,
})?;

connection
.build_transaction()
.read_only()
.run(query.clone())
.await
.map_err(|e| {
tracing::error!("Error with reading data from DB: {:?}, retrying...", e);
backoff::Error::Transient {
err: IndexerError::PostgresWriteError(e.to_string()),
retry_after: None,
}
})
})
.await
}
106 changes: 102 additions & 4 deletions crates/sui-indexer/src/store/pg_indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use crate::models::objects::{
};
use crate::models::packages::StoredPackage;
use crate::models::transactions::StoredTransaction;
use crate::models::watermarks::StoredWatermark;
use crate::models::watermarks::{StoredWatermark, WatermarkRead};
use crate::schema::{
chain_identifier, checkpoints, display, epochs, event_emit_module, event_emit_package,
event_senders, event_struct_instantiation, event_struct_module, event_struct_name,
Expand All @@ -55,7 +55,7 @@ use crate::schema::{
tx_calls_mod, tx_calls_pkg, tx_changed_objects, tx_digests, tx_input_objects, tx_kinds,
tx_recipients, tx_senders, watermarks,
};
use crate::store::transaction_with_retry;
use crate::store::{read_with_retry, transaction_with_retry};
use crate::types::{EventIndex, IndexedDeletedObject, IndexedObject};
use crate::types::{IndexedCheckpoint, IndexedEvent, IndexedPackage, IndexedTransaction, TxIndex};

Expand Down Expand Up @@ -250,7 +250,7 @@ impl PgIndexerStore {
.context("Failed reading min prunable checkpoint sequence number from PostgresDB")
}

async fn get_checkpoint_range_for_epoch(
pub async fn get_checkpoint_range_for_epoch(
&self,
epoch: u64,
) -> Result<(u64, Option<u64>), IndexerError> {
Expand All @@ -268,7 +268,7 @@ impl PgIndexerStore {
.context("Failed reading checkpoint range from PostgresDB")
}

async fn get_transaction_range_for_checkpoint(
pub async fn get_transaction_range_for_checkpoint(
&self,
checkpoint: u64,
) -> Result<(u64, u64), IndexerError> {
Expand Down Expand Up @@ -1514,6 +1514,93 @@ impl PgIndexerStore {
tracing::error!("Failed to persist watermarks with error: {}", e);
})
}

async fn update_watermarks_lower_bound(
&self,
watermarks: Vec<StoredWatermark>,
) -> Result<(), IndexerError> {
use diesel_async::RunQueryDsl;

let guard = self
.metrics
.checkpoint_db_commit_latency_watermarks
.start_timer();

transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
let lower_bound_updates = watermarks.clone();
async {
use diesel::dsl::sql;
use diesel::query_dsl::methods::FilterDsl;

diesel::insert_into(watermarks::table)
.values(lower_bound_updates)
.on_conflict(watermarks::entity)
.do_update()
.set((
watermarks::reader_lo.eq(excluded(watermarks::reader_lo)),
watermarks::epoch_lo.eq(excluded(watermarks::epoch_lo)),
watermarks::timestamp_ms.eq(sql::<diesel::sql_types::BigInt>(
"(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000)::bigint",
)),
))
.filter(excluded(watermarks::reader_lo).gt(watermarks::reader_lo))
.filter(excluded(watermarks::epoch_lo).gt(watermarks::epoch_lo))
.filter(
diesel::dsl::sql::<diesel::sql_types::BigInt>(
"(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000)::bigint",
)
.gt(watermarks::timestamp_ms),
)
.execute(conn)
.await?;

Ok::<(), IndexerError>(())
}
.scope_boxed()
})
.await
.tap_ok(|_| {
let elapsed = guard.stop_and_record();
info!(elapsed, "Persisted watermarks");
})
.tap_err(|e| {
tracing::error!("Failed to persist watermarks with error: {}", e);
})
}

async fn get_watermarks(&self) -> Result<Vec<WatermarkRead>, IndexerError> {
use diesel_async::RunQueryDsl;

// read_only transaction, otherwise this will block and get blocked by write transactions to
// the same table.
read_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
async {
let stored = watermarks::table
.load::<StoredWatermark>(conn)
.await
.map_err(|e| IndexerError::from(e))
.context("Failed reading watermarks from PostgresDB")?;

let timestamp = diesel::select(diesel::dsl::sql::<diesel::sql_types::BigInt>(
"(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) * 1000)::bigint",
))
.get_result(conn)
.await
.map_err(|e| IndexerError::from(e))
.context("Failed reading current timestamp from PostgresDB")?;

stored
.into_iter()
.map(|w| {
let watermark_read = WatermarkRead::new(w, timestamp)?;
Ok(watermark_read)
})
.collect::<Result<Vec<_>, _>>()
}
.scope_boxed()
})
.await
}
}

#[async_trait]
Expand Down Expand Up @@ -2181,6 +2268,17 @@ impl IndexerStore for PgIndexerStore {
self.update_watermarks_upper_bound(tables, epoch, cp, tx)
.await
}

async fn update_watermarks_lower_bound(
&self,
watermarks: Vec<StoredWatermark>,
) -> Result<(), IndexerError> {
self.update_watermarks_lower_bound(watermarks).await
}

async fn get_watermarks(&self) -> Result<Vec<WatermarkRead>, IndexerError> {
self.get_watermarks().await
}
}

fn make_objects_history_to_commit(
Expand Down

0 comments on commit 5a9a066

Please sign in to comment.