From 6fe410dfbbc78b81d926ae1c88b5f8edc6e266d2 Mon Sep 17 00:00:00 2001 From: Ashok Menon Date: Mon, 11 Nov 2024 02:43:06 +0000 Subject: [PATCH] indexer-alt: pruner task ## Description Add the task that actually deletes data, based on the reader low watermark. ## Test plan Run the indexer and note the following: - Metrics related to deleted rows by the pruner (from `localhost:9184/metrics`) - The contents of the `watermarks` table. ``` sui$ cargo run -p sui-indexer-alt --release -- \ --database-url "postgres://postgres:postgrespw@localhost:5432/sui_indexer_alt" \ indexer --remote-store-url https://checkpoints.mainnet.sui.io \ --last-checkpoint 10000 \ --consistent-range 100 --consistent-pruning-interval 10 \ --pipeline sum_obj_types --pipeline wal_obj_types ``` Also tested running the indexer for an extended period of time (1M checkpoints over roughly half an hour in local testing), and noted how the pruner behaves. When configured as it would be in production (roughly one hour of consistent range, and a 5 minute pruning interval and a 2 minute pruning delay): - Many rows accumulated during backfill -- by the end of the 1M checkpoints, the pruner had only pruned up to between checkpoint 500K and checkpoint 700K depending on the pipeline. This should not be an issue under normal operation where the indexer will run for long enough for pruning to stabilise at the tip of the network (and it would be recommended practice to start from formal snapshot and therefore only need to run pruning from that point forward). - Because the reader watermark task and the pruner task use the same interval, it can take up to two ticks of that interval for the pruner to act on a change to its upperbound -- again, it should be okay, as the pruner's interval should be at least an order of magnitude smaller than its retention period. --- crates/sui-indexer-alt/src/args.rs | 9 + .../src/handlers/wal_coin_balances.rs | 8 + .../src/handlers/wal_obj_types.rs | 8 + crates/sui-indexer-alt/src/main.rs | 2 + crates/sui-indexer-alt/src/metrics.rs | 69 +++++- .../sui-indexer-alt/src/models/watermarks.rs | 92 ++++++- .../src/pipeline/concurrent/mod.rs | 26 +- .../src/pipeline/concurrent/pruner.rs | 226 ++++++++++++++++++ 8 files changed, 434 insertions(+), 6 deletions(-) create mode 100644 crates/sui-indexer-alt/src/pipeline/concurrent/pruner.rs diff --git a/crates/sui-indexer-alt/src/args.rs b/crates/sui-indexer-alt/src/args.rs index ac03cca0fe85c..0eda647aba4e9 100644 --- a/crates/sui-indexer-alt/src/args.rs +++ b/crates/sui-indexer-alt/src/args.rs @@ -34,6 +34,15 @@ pub enum Command { )] consistent_pruning_interval: Duration, + /// How long to wait before honouring reader low watermarks. + #[arg( + long, + default_value = "120", + value_name = "SECONDS", + value_parser = |s: &str| s.parse().map(Duration::from_secs), + )] + pruner_delay: Duration, + /// Number of checkpoints to delay indexing summary tables for. #[clap(long)] consistent_range: Option, diff --git a/crates/sui-indexer-alt/src/handlers/wal_coin_balances.rs b/crates/sui-indexer-alt/src/handlers/wal_coin_balances.rs index c45cce0cd7b37..3884d6f534530 100644 --- a/crates/sui-indexer-alt/src/handlers/wal_coin_balances.rs +++ b/crates/sui-indexer-alt/src/handlers/wal_coin_balances.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use anyhow::Result; +use diesel::{ExpressionMethods, QueryDsl}; use diesel_async::RunQueryDsl; use sui_types::full_checkpoint_content::CheckpointData; @@ -56,4 +57,11 @@ impl Handler for WalCoinBalances { .execute(conn) .await?) } + + async fn prune(from: u64, to: u64, conn: &mut db::Connection<'_>) -> Result { + let filter = wal_coin_balances::table + .filter(wal_coin_balances::cp_sequence_number.between(from as i64, to as i64 - 1)); + + Ok(diesel::delete(filter).execute(conn).await?) + } } diff --git a/crates/sui-indexer-alt/src/handlers/wal_obj_types.rs b/crates/sui-indexer-alt/src/handlers/wal_obj_types.rs index 6c5f6608cc01b..ba7380ac45b7b 100644 --- a/crates/sui-indexer-alt/src/handlers/wal_obj_types.rs +++ b/crates/sui-indexer-alt/src/handlers/wal_obj_types.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use anyhow::Result; +use diesel::{ExpressionMethods, QueryDsl}; use diesel_async::RunQueryDsl; use sui_types::full_checkpoint_content::CheckpointData; @@ -59,4 +60,11 @@ impl Handler for WalObjTypes { .execute(conn) .await?) } + + async fn prune(from: u64, to: u64, conn: &mut db::Connection<'_>) -> Result { + let filter = wal_obj_types::table + .filter(wal_obj_types::cp_sequence_number.between(from as i64, to as i64 - 1)); + + Ok(diesel::delete(filter).execute(conn).await?) + } } diff --git a/crates/sui-indexer-alt/src/main.rs b/crates/sui-indexer-alt/src/main.rs index 4786a38efb2a8..9644efc6a6996 100644 --- a/crates/sui-indexer-alt/src/main.rs +++ b/crates/sui-indexer-alt/src/main.rs @@ -41,6 +41,7 @@ async fn main() -> Result<()> { Command::Indexer { indexer, consistent_pruning_interval, + pruner_delay, consistent_range: lag, } => { let retry_interval = indexer.ingestion_config.retry_interval; @@ -61,6 +62,7 @@ async fn main() -> Result<()> { // write-ahead log needs to be pruned. let pruner_config = lag.map(|l| PrunerConfig { interval: consistent_pruning_interval, + delay: pruner_delay, // Retain at least twice as much data as the lag, to guarantee overlap between the // summary table and the write-ahead log. retention: l * 2, diff --git a/crates/sui-indexer-alt/src/metrics.rs b/crates/sui-indexer-alt/src/metrics.rs index 628b61dabfe1c..bb9c8b15b287f 100644 --- a/crates/sui-indexer-alt/src/metrics.rs +++ b/crates/sui-indexer-alt/src/metrics.rs @@ -81,7 +81,7 @@ pub struct IndexerMetrics { pub handler_checkpoint_latency: HistogramVec, - // Statistics related to individual ingestion pipelines' committers. + // Statistics related to individual ingestion pipelines. pub total_collector_rows_received: IntCounterVec, pub total_collector_batches_created: IntCounterVec, pub total_committer_batches_attempted: IntCounterVec, @@ -89,24 +89,32 @@ pub struct IndexerMetrics { pub total_committer_rows_committed: IntCounterVec, pub total_committer_rows_affected: IntCounterVec, pub total_watermarks_out_of_order: IntCounterVec, + pub total_pruner_chunks_attempted: IntCounterVec, + pub total_pruner_chunks_deleted: IntCounterVec, + pub total_pruner_rows_deleted: IntCounterVec, pub collector_gather_latency: HistogramVec, pub collector_batch_size: HistogramVec, pub committer_commit_latency: HistogramVec, pub watermark_gather_latency: HistogramVec, pub watermark_commit_latency: HistogramVec, + pub watermark_pruner_read_latency: HistogramVec, + pub watermark_pruner_write_latency: HistogramVec, + pub pruner_delete_latency: HistogramVec, pub watermark_epoch: IntGaugeVec, pub watermark_checkpoint: IntGaugeVec, pub watermark_transaction: IntGaugeVec, pub watermark_timestamp_ms: IntGaugeVec, pub watermark_reader_lo: IntGaugeVec, + pub watermark_pruner_hi: IntGaugeVec, pub watermark_epoch_in_db: IntGaugeVec, pub watermark_checkpoint_in_db: IntGaugeVec, pub watermark_transaction_in_db: IntGaugeVec, pub watermark_timestamp_in_db_ms: IntGaugeVec, pub watermark_reader_lo_in_db: IntGaugeVec, + pub watermark_pruner_hi_in_db: IntGaugeVec, } /// Collects information about the database connection pool. @@ -319,6 +327,27 @@ impl IndexerMetrics { registry, ) .unwrap(), + total_pruner_chunks_attempted: register_int_counter_vec_with_registry!( + "indexer_pruner_chunks_attempted", + "Number of chunks this pruner attempted to delete", + &["pipeline"], + registry, + ) + .unwrap(), + total_pruner_chunks_deleted: register_int_counter_vec_with_registry!( + "indexer_pruner_chunks_deleted", + "Number of chunks this pruner successfully deleted", + &["pipeline"], + registry, + ) + .unwrap(), + total_pruner_rows_deleted: register_int_counter_vec_with_registry!( + "indexer_pruner_rows_deleted", + "Number of rows this pruner successfully deleted", + &["pipeline"], + registry, + ) + .unwrap(), collector_gather_latency: register_histogram_vec_with_registry!( "indexer_collector_gather_latency", "Time taken to gather rows into a batch by this collector", @@ -359,6 +388,30 @@ impl IndexerMetrics { registry, ) .unwrap(), + watermark_pruner_read_latency: register_histogram_vec_with_registry!( + "indexer_watermark_pruner_read_latency", + "Time taken to read pruner's next upper and lowerbounds from the database by this pruner", + &["pipeline"], + DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(), + registry, + ) + .unwrap(), + watermark_pruner_write_latency: register_histogram_vec_with_registry!( + "indexer_watermark_pruner_write_latency", + "Time taken to write the pruner's new upperbound to the database by this pruner", + &["pipeline"], + DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(), + registry, + ) + .unwrap(), + pruner_delete_latency: register_histogram_vec_with_registry!( + "indexer_pruner_delete_latency", + "Time taken to delete a chunk of data from the database by this pruner", + &["pipeline"], + DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(), + registry, + ) + .unwrap(), watermark_epoch: register_int_gauge_vec_with_registry!( "indexer_watermark_epoch", "Current epoch high watermark for this committer", @@ -394,6 +447,13 @@ impl IndexerMetrics { registry, ) .unwrap(), + watermark_pruner_hi: register_int_gauge_vec_with_registry!( + "indexer_watermark_pruner_hi", + "Current pruner high watermark for this pruner", + &["pipeline"], + registry, + ) + .unwrap(), watermark_epoch_in_db: register_int_gauge_vec_with_registry!( "indexer_watermark_epoch_in_db", "Last epoch high watermark this committer wrote to the DB", @@ -429,6 +489,13 @@ impl IndexerMetrics { registry, ) .unwrap(), + watermark_pruner_hi_in_db: register_int_gauge_vec_with_registry!( + "indexer_watermark_pruner_hi_in_db", + "Last pruner high watermark this pruner wrote to the DB", + &["pipeline"], + registry, + ) + .unwrap(), } } diff --git a/crates/sui-indexer-alt/src/models/watermarks.rs b/crates/sui-indexer-alt/src/models/watermarks.rs index d59b2c6ae4393..874e9c1617034 100644 --- a/crates/sui-indexer-alt/src/models/watermarks.rs +++ b/crates/sui-indexer-alt/src/models/watermarks.rs @@ -1,10 +1,10 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use std::borrow::Cow; +use std::{borrow::Cow, time::Duration}; use chrono::{naive::NaiveDateTime, DateTime, Utc}; -use diesel::prelude::*; +use diesel::{dsl::sql, prelude::*, sql_types}; use diesel_async::RunQueryDsl; use sui_field_count::FieldCount; @@ -42,6 +42,24 @@ pub struct ReaderWatermark<'p> { pub reader_lo: i64, } +#[derive(Queryable, Debug, Clone, FieldCount)] +#[diesel(table_name = watermarks)] +pub struct PrunerWatermark<'p> { + /// The pipeline in question + pub pipeline: Cow<'p, str>, + + /// How long to wait from when this query ran on the database until this information can be + /// used to prune the database. This number could be negative, meaning no waiting is necessary. + pub wait_for: i64, + + /// The pruner can delete up to this checkpoint, (exclusive). + pub reader_lo: i64, + + /// The pruner has already deleted up to this checkpoint (exclusive), so can continue from this + /// point. + pub pruner_hi: i64, +} + impl StoredWatermark { pub async fn get( conn: &mut Connection<'_>, @@ -129,6 +147,76 @@ impl<'p> ReaderWatermark<'p> { } } +impl PrunerWatermark<'static> { + /// Get the bounds for the region that the pruner still has to prune for the given `pipeline`, + /// along with a duration to wait before acting on this information, based on the time at which + /// the pruner last updated the bounds, and the configured `delay`. + /// + /// The pruner is allowed to prune the region between the returned `pruner_hi` (inclusive) and + /// `reader_lo` (exclusive) after `wait_for` milliseconds have passed since this response was + /// returned. + pub async fn get( + conn: &mut Connection<'_>, + pipeline: &'static str, + delay: Duration, + ) -> QueryResult> { + // |---------- + delay ---------------------| + // |--- wait_for ---| + // |-----------------------|----------------| + // ^ ^ + // pruner_timestamp NOW() + let wait_for = sql::(&format!( + "CAST({} + 1000 * EXTRACT(EPOCH FROM pruner_timestamp - NOW()) AS BIGINT)", + delay.as_millis(), + )); + + watermarks::table + .select(( + watermarks::pipeline, + wait_for, + watermarks::reader_lo, + watermarks::pruner_hi, + )) + .filter(watermarks::pipeline.eq(pipeline)) + .first(conn) + .await + .optional() + } +} + +impl<'p> PrunerWatermark<'p> { + /// How long to wait before the pruner can act on this information, or `None`, if there is no + /// need to wait. + pub fn wait_for(&self) -> Option { + (self.wait_for > 0).then(|| Duration::from_millis(self.wait_for as u64)) + } + + /// Whether the pruner has any work left to do on the range in this watermark. + pub fn is_empty(&self) -> bool { + self.pruner_hi >= self.reader_lo + } + + /// The next chunk that the pruner should work on, to advance the watermark. + pub fn next_chunk(&mut self, size: u64) -> (u64, u64) { + let from = self.pruner_hi as u64; + let to = (from + size).min(self.reader_lo as u64); + (from, to) + } + + /// Update the pruner high watermark (only) for an existing watermark row, as long as this + /// raises the watermark. + /// + /// Returns a boolean indicating whether the watermark was actually updated or not. + pub async fn update(&self, conn: &mut Connection<'_>) -> QueryResult { + Ok(diesel::update(watermarks::table) + .set(watermarks::pruner_hi.eq(self.pruner_hi)) + .filter(watermarks::pipeline.eq(&self.pipeline)) + .execute(conn) + .await? + > 0) + } +} + impl<'p> From> for StoredWatermark { fn from(watermark: CommitterWatermark<'p>) -> Self { StoredWatermark { diff --git a/crates/sui-indexer-alt/src/pipeline/concurrent/mod.rs b/crates/sui-indexer-alt/src/pipeline/concurrent/mod.rs index 3275a1fd18930..547d8d4364c7f 100644 --- a/crates/sui-indexer-alt/src/pipeline/concurrent/mod.rs +++ b/crates/sui-indexer-alt/src/pipeline/concurrent/mod.rs @@ -16,11 +16,14 @@ use crate::{ use super::{processor::processor, PipelineConfig, Processor, WatermarkPart, PIPELINE_BUFFER}; -use self::{collector::collector, commit_watermark::commit_watermark, committer::committer}; +use self::{ + collector::collector, commit_watermark::commit_watermark, committer::committer, pruner::pruner, +}; mod collector; mod commit_watermark; mod committer; +mod pruner; mod reader_watermark; /// The maximum number of watermarks that can show up in a single batch. This limit exists to deal @@ -63,6 +66,12 @@ pub trait Handler: Processor { /// affected. async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> anyhow::Result; + + /// Clean up data between checkpoints `_from` and `_to` (inclusive) in the database, returning + /// the number of rows affected. This function is optional, and defaults to not pruning at all. + async fn prune(_from: u64, _to: u64, _conn: &mut db::Connection<'_>) -> anyhow::Result { + Ok(0) + } } #[derive(Debug, Clone)] @@ -70,6 +79,10 @@ pub struct PrunerConfig { /// How often the pruner should check whether there is any data to prune. pub interval: Duration, + /// How long to wait after the reader low watermark was set, until it is safe to prune up until + /// this new watermark. + pub delay: Duration, + /// How much data to keep, this is measured in checkpoints. pub retention: u64, @@ -181,12 +194,19 @@ pub(crate) fn pipeline( cancel, ); - let reader_watermark = reader_watermark::(pruner_config, db, metrics, pruner_cancel.clone()); + let reader_watermark = reader_watermark::( + pruner_config.clone(), + db.clone(), + metrics.clone(), + pruner_cancel.clone(), + ); + + let pruner = pruner::(pruner_config, db, metrics, pruner_cancel.clone()); tokio::spawn(async move { let (_, _, _, _) = futures::join!(processor, collector, committer, commit_watermark); pruner_cancel.cancel(); - let _ = futures::join!(reader_watermark); + let _ = futures::join!(reader_watermark, pruner); }) } diff --git a/crates/sui-indexer-alt/src/pipeline/concurrent/pruner.rs b/crates/sui-indexer-alt/src/pipeline/concurrent/pruner.rs new file mode 100644 index 0000000000000..8aa4c0c53d35a --- /dev/null +++ b/crates/sui-indexer-alt/src/pipeline/concurrent/pruner.rs @@ -0,0 +1,226 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use std::sync::Arc; + +use mysten_metrics::spawn_monitored_task; +use tokio::{ + task::JoinHandle, + time::{interval, MissedTickBehavior}, +}; +use tokio_util::sync::CancellationToken; +use tracing::{debug, error, info, warn}; + +use crate::{ + db::Db, metrics::IndexerMetrics, models::watermarks::PrunerWatermark, + pipeline::LOUD_WATERMARK_UPDATE_INTERVAL, +}; + +use super::{Handler, PrunerConfig}; + +/// The pruner task is responsible for deleting old data from the database. It will periodically +/// check the `watermarks` table to see if there is any data that should be pruned -- between +/// `pruner_hi` (inclusive), and `reader_lo` (exclusive). +/// +/// To ensure that the pruner does not interfere with reads that are still in flight, it respects +/// the watermark's `pruner_timestamp`, which records the time that `reader_lo` was last updated. +/// The task will not prune data until at least `config.delay` has passed since `pruner_timestamp` +/// to give in-flight reads time to land. +/// +/// The task regularly traces its progress, outputting at a higher log level every +/// [LOUD_WATERMARK_UPDATE_INTERVAL]-many checkpoints. +/// +/// The task will shutdown if the `cancel` token is signalled. If the `config` is `None`, the task +/// will shutdown immediately. +pub(super) fn pruner( + config: Option, + db: Db, + metrics: Arc, + cancel: CancellationToken, +) -> JoinHandle<()> { + spawn_monitored_task!(async move { + let Some(config) = config else { + info!(pipeline = H::NAME, "Skipping pruner task"); + return; + }; + + // The pruner can pause for a while, waiting for the delay imposed by the + // `pruner_timestamp` to expire. In that case, the period between ticks should not be + // compressed to make up for missed ticks. + let mut poll = interval(config.interval); + poll.set_missed_tick_behavior(MissedTickBehavior::Delay); + + // The pruner task will periodically output a log message at a higher log level to + // demonstrate that it is making progress. + let mut next_loud_watermark_update = 0; + + 'outer: loop { + // (1) Get the latest pruning bounds from the database. + let mut watermark = tokio::select! { + _ = cancel.cancelled() => { + info!(pipeline = H::NAME, "Shutdown received"); + break; + } + + _ = poll.tick() => { + let guard = metrics + .watermark_pruner_read_latency + .with_label_values(&[H::NAME]) + .start_timer(); + + let Ok(mut conn) = db.connect().await else { + warn!(pipeline = H::NAME, "Pruner failed to connect, while fetching watermark"); + continue; + }; + + match PrunerWatermark::get(&mut conn, H::NAME, config.delay).await { + Ok(Some(current)) => { + guard.stop_and_record(); + current + } + + Ok(None) => { + guard.stop_and_record(); + warn!(pipeline = H::NAME, "No watermark for pipeline, skipping"); + continue; + } + + Err(e) => { + guard.stop_and_record(); + warn!(pipeline = H::NAME, "Failed to get watermark: {e}"); + continue; + } + } + } + }; + + // (2) Wait until this information can be acted upon. + if let Some(wait_for) = watermark.wait_for() { + debug!(pipeline = H::NAME, ?wait_for, "Waiting to prune"); + tokio::select! { + _ = tokio::time::sleep(wait_for) => {} + _ = cancel.cancelled() => { + info!(pipeline = H::NAME, "Shutdown received"); + break; + } + } + } + + // (3) Prune chunk by chunk to avoid the task waiting on a long-running database + // transaction, between tests for cancellation. + while !watermark.is_empty() { + if cancel.is_cancelled() { + info!(pipeline = H::NAME, "Shutdown received"); + break 'outer; + } + + metrics + .total_pruner_chunks_attempted + .with_label_values(&[H::NAME]) + .inc(); + + let guard = metrics + .pruner_delete_latency + .with_label_values(&[H::NAME]) + .start_timer(); + + let Ok(mut conn) = db.connect().await else { + warn!( + pipeline = H::NAME, + "Pruner failed to connect, while pruning" + ); + break; + }; + + let (from, to) = watermark.next_chunk(config.max_chunk_size); + let affected = match H::prune(from, to, &mut conn).await { + Ok(affected) => { + guard.stop_and_record(); + watermark.pruner_hi = to as i64; + affected + } + + Err(e) => { + guard.stop_and_record(); + error!(pipeline = H::NAME, "Failed to prune data: {e}"); + break; + } + }; + + metrics + .total_pruner_chunks_deleted + .with_label_values(&[H::NAME]) + .inc(); + + metrics + .total_pruner_rows_deleted + .with_label_values(&[H::NAME]) + .inc_by(affected as u64); + + metrics + .watermark_pruner_hi + .with_label_values(&[H::NAME]) + .set(watermark.pruner_hi); + } + + // (4) Update the pruner watermark + let guard = metrics + .watermark_pruner_write_latency + .with_label_values(&[H::NAME]) + .start_timer(); + + let Ok(mut conn) = db.connect().await else { + warn!( + pipeline = H::NAME, + "Pruner failed to connect, while updating watermark" + ); + continue; + }; + + match watermark.update(&mut conn).await { + Err(e) => { + let elapsed = guard.stop_and_record(); + error!( + pipeline = H::NAME, + elapsed_ms = elapsed * 1000.0, + "Failed to update pruner watermark: {e}" + ) + } + + Ok(updated) => { + let elapsed = guard.stop_and_record(); + + if updated { + metrics + .watermark_pruner_hi_in_db + .with_label_values(&[H::NAME]) + .set(watermark.pruner_hi); + } + + if watermark.pruner_hi > next_loud_watermark_update { + next_loud_watermark_update = + watermark.pruner_hi + LOUD_WATERMARK_UPDATE_INTERVAL; + + info!( + pipeline = H::NAME, + pruner_hi = watermark.pruner_hi, + updated, + elapsed_ms = elapsed * 1000.0, + "Watermark" + ); + } else { + debug!( + pipeline = H::NAME, + pruner_hi = watermark.pruner_hi, + updated, + elapsed_ms = elapsed * 1000.0, + "Watermark" + ); + } + } + } + } + + info!(pipeline = H::NAME, "Stopping pruner"); + }) +}