From fdb4d1d801b01267176c56a058a0f51334fbda9d Mon Sep 17 00:00:00 2001 From: Xun Li Date: Wed, 15 Jan 2025 17:30:41 -0800 Subject: [PATCH] [indexer-alt] Make pruning parallel --- .../src/pipeline/concurrent/pruner.rs | 168 ++++++++++++------ .../src/watermarks.rs | 2 + 2 files changed, 117 insertions(+), 53 deletions(-) diff --git a/crates/sui-indexer-alt-framework/src/pipeline/concurrent/pruner.rs b/crates/sui-indexer-alt-framework/src/pipeline/concurrent/pruner.rs index 5990e9bfe49fa2..8e18563e7114e1 100644 --- a/crates/sui-indexer-alt-framework/src/pipeline/concurrent/pruner.rs +++ b/crates/sui-indexer-alt-framework/src/pipeline/concurrent/pruner.rs @@ -1,10 +1,13 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use std::sync::Arc; +use std::{collections::BTreeMap, sync::Arc}; +use futures::stream::FuturesUnordered; +use futures::StreamExt; use sui_pg_db::Db; use tokio::{ + sync::Semaphore, task::JoinHandle, time::{interval, MissedTickBehavior}, }; @@ -19,6 +22,9 @@ use crate::{ use super::{Handler, PrunerConfig}; +// TODO: Consider making this configurable. +const MAX_CONCURRENT_PRUNER_TASKS: usize = 10; + /// The pruner task is responsible for deleting old data from the database. It will periodically /// check the `watermarks` table to see if there is any data that should be pruned between the /// `pruner_hi` (inclusive), and `reader_lo` (exclusive) checkpoints. This task will also provide a @@ -63,6 +69,7 @@ pub(super) fn pruner( // demonstrate that it is making progress. let mut logger = WatermarkLogger::new("pruner", LoggerWatermark::default()); + let mut pending_prune_ranges = BTreeMap::new(); 'outer: loop { // (1) Get the latest pruning bounds from the database. let mut watermark = tokio::select! { @@ -115,65 +122,74 @@ pub(super) fn pruner( } } + // (3) Collect all the new chunks that are ready to be pruned. + let mut tmp_watermark = watermark.clone(); + while let Some((from, to_exclusive)) = tmp_watermark.next_chunk(config.max_chunk_size) { + pending_prune_ranges.insert(from, to_exclusive); + } + // (3) Prune chunk by chunk to avoid the task waiting on a long-running database // transaction, between tests for cancellation. - while let Some((from, to_exclusive)) = watermark.next_chunk(config.max_chunk_size) { - if cancel.is_cancelled() { - info!(pipeline = H::NAME, "Shutdown received"); - break 'outer; - } + // Spawn all tasks in parallel, but limit the number of concurrent tasks. + let semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_PRUNER_TASKS)); + let mut tasks = FuturesUnordered::new(); + for (from, to_exclusive) in pending_prune_ranges.iter() { + let semaphore = semaphore.clone(); + let cancel = cancel.child_token(); + let db = db.clone(); + let metrics = metrics.clone(); + let handler = handler.clone(); + let from = *from; + let to_exclusive = *to_exclusive; - metrics - .total_pruner_chunks_attempted - .with_label_values(&[H::NAME]) - .inc(); - - let guard = metrics - .pruner_delete_latency - .with_label_values(&[H::NAME]) - .start_timer(); - - let Ok(mut conn) = db.connect().await else { - warn!( - pipeline = H::NAME, - "Pruner failed to connect, while pruning" - ); - break; - }; - - debug!( - pipeline = H::NAME, - "Pruning from {} to {}", from, to_exclusive - ); + tasks.push(tokio::spawn(async move { + let _permit = tokio::select! { + permit = semaphore.acquire() => { + permit.unwrap() + } + _ = cancel.cancelled() => { + return ((from, to_exclusive), Err(anyhow::anyhow!("Cancelled"))); + } + }; + let result = prune_task_impl(metrics, db, handler, from, to_exclusive).await; + ((from, to_exclusive), result) + })); + } - let affected = match handler.prune(from, to_exclusive, &mut conn).await { - Ok(affected) => { - guard.stop_and_record(); - watermark.pruner_hi = to_exclusive as i64; - affected + // (4) Wait for all tasks to finish. + // For each task, if it succeeds, remove the range from the pending_prune_ranges. + // Otherwise the range will remain in the map and will be retried in the next iteration. + while let Some(r) = tasks.next().await { + let ((from, to_exclusive), result) = r.unwrap(); + match result { + Ok(()) => { + let removed = pending_prune_ranges.remove(&from); + assert_eq!( + removed, + Some(to_exclusive), + "Removed range is not the same as the one we expected to remove" + ); + // The current pruner_hi is the first checkpoint that has not yet been pruned. + // This would be the first key in the pending_prune_ranges map. + // If the map is empty, then the pruner_hi is the last checkpoint that has been pruned. + let pruner_hi = pending_prune_ranges + .keys() + .next() + .cloned() + .unwrap_or(to_exclusive) as i64; + metrics + .watermark_pruner_hi + .with_label_values(&[H::NAME]) + .set(pruner_hi); + watermark.pruner_hi = pruner_hi; } - Err(e) => { - guard.stop_and_record(); - error!(pipeline = H::NAME, "Failed to prune data: {e}"); - break; + error!( + pipeline = H::NAME, + "Failed to prune data for range: {from} to {to_exclusive}: {e}" + ); } - }; - - metrics - .total_pruner_chunks_deleted - .with_label_values(&[H::NAME]) - .inc(); - - metrics - .total_pruner_rows_deleted - .with_label_values(&[H::NAME]) - .inc_by(affected as u64); - - metrics - .watermark_pruner_hi - .with_label_values(&[H::NAME]) - .set(watermark.pruner_hi); + } } // (4) Update the pruner watermark @@ -216,3 +232,49 @@ pub(super) fn pruner( info!(pipeline = H::NAME, "Stopping pruner"); }) } + +async fn prune_task_impl( + metrics: Arc, + db: Db, + handler: Arc, + from: u64, + to_exclusive: u64, +) -> Result<(), anyhow::Error> { + metrics + .total_pruner_chunks_attempted + .with_label_values(&[H::NAME]) + .inc(); + + let guard = metrics + .pruner_delete_latency + .with_label_values(&[H::NAME]) + .start_timer(); + + let mut conn = db.connect().await?; + + debug!(pipeline = H::NAME, "Pruning from {from} to {to_exclusive}"); + + let affected = match handler.prune(from, to_exclusive, &mut conn).await { + Ok(affected) => { + guard.stop_and_record(); + affected + } + + Err(e) => { + guard.stop_and_record(); + return Err(e); + } + }; + + metrics + .total_pruner_chunks_deleted + .with_label_values(&[H::NAME]) + .inc(); + + metrics + .total_pruner_rows_deleted + .with_label_values(&[H::NAME]) + .inc_by(affected as u64); + + Ok(()) +} diff --git a/crates/sui-indexer-alt-framework/src/watermarks.rs b/crates/sui-indexer-alt-framework/src/watermarks.rs index 027e2aaab3aefa..c6545323bd69a8 100644 --- a/crates/sui-indexer-alt-framework/src/watermarks.rs +++ b/crates/sui-indexer-alt-framework/src/watermarks.rs @@ -215,6 +215,7 @@ impl<'p> PrunerWatermark<'p> { /// The next chunk of checkpoints that the pruner should work on, to advance the watermark. /// If no more checkpoints to prune, returns `None`. /// Otherwise, returns a tuple (from, to_exclusive) where `from` is inclusive and `to_exclusive` is exclusive. + /// Advance the watermark as well. pub(crate) fn next_chunk(&mut self, size: u64) -> Option<(u64, u64)> { if self.pruner_hi >= self.reader_lo { return None; @@ -222,6 +223,7 @@ impl<'p> PrunerWatermark<'p> { let from = self.pruner_hi as u64; let to_exclusive = (from + size).min(self.reader_lo as u64); + self.pruner_hi = to_exclusive as i64; Some((from, to_exclusive)) }