Skip to content

Commit

Permalink
[indexer-alt] Make pruning parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
lxfind committed Jan 16, 2025
1 parent 0598968 commit 79fad42
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 55 deletions.
188 changes: 133 additions & 55 deletions crates/sui-indexer-alt-framework/src/pipeline/concurrent/pruner.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::sync::Arc;
use std::{collections::BTreeMap, sync::Arc};

use futures::stream::FuturesUnordered;
use futures::StreamExt;
use sui_pg_db::Db;
use tokio::{
sync::Semaphore,
task::JoinHandle,
time::{interval, MissedTickBehavior},
};
Expand All @@ -19,6 +22,9 @@ use crate::{

use super::{Handler, PrunerConfig};

// TODO: Consider making this configurable.
const MAX_CONCURRENT_PRUNER_TASKS: usize = 10;

/// The pruner task is responsible for deleting old data from the database. It will periodically
/// check the `watermarks` table to see if there is any data that should be pruned between the
/// `pruner_hi` (inclusive), and `reader_lo` (exclusive) checkpoints. This task will also provide a
Expand Down Expand Up @@ -63,6 +69,11 @@ pub(super) fn pruner<H: Handler + Send + Sync + 'static>(
// demonstrate that it is making progress.
let mut logger = WatermarkLogger::new("pruner", LoggerWatermark::default());

// Maintains the list of chunks that are ready to be pruned but not yet pruned.
// This map can contain ranges that were attempted to be pruned in previous iterations,
// but failed due to errors.
let mut pending_prune_ranges = BTreeMap::new();

'outer: loop {
// (1) Get the latest pruning bounds from the database.
let mut watermark = tokio::select! {
Expand Down Expand Up @@ -115,65 +126,86 @@ pub(super) fn pruner<H: Handler + Send + Sync + 'static>(
}
}

// (3) Prune chunk by chunk to avoid the task waiting on a long-running database
// transaction, between tests for cancellation.
while let Some((from, to_exclusive)) = watermark.next_chunk(config.max_chunk_size) {
if cancel.is_cancelled() {
info!(pipeline = H::NAME, "Shutdown received");
break 'outer;
}
// Keep a copy of the watermark for the db_watermark.
// This is because we can only advance db_watermark when all checkpoints
// up to it have been pruned.
let db_watermark = watermark.clone();

metrics
.total_pruner_chunks_attempted
.with_label_values(&[H::NAME])
.inc();
// (3) Collect all the new chunks that are ready to be pruned.
// This will also advance the watermark.
while let Some((from, to_exclusive)) = watermark.next_chunk(config.max_chunk_size) {
pending_prune_ranges.insert(from, to_exclusive);
}

let guard = metrics
.pruner_delete_latency
.with_label_values(&[H::NAME])
.start_timer();
debug!(
pipeline = H::NAME,
"Number of chunks to prune: {}",
pending_prune_ranges.len()
);

let Ok(mut conn) = db.connect().await else {
warn!(
pipeline = H::NAME,
"Pruner failed to connect, while pruning"
);
break;
};
// (3) Prune chunk by chunk to avoid the task waiting on a long-running database
// transaction, between tests for cancellation.
// Spawn all tasks in parallel, but limit the number of concurrent tasks.
let semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_PRUNER_TASKS));
let mut tasks = FuturesUnordered::new();
for (from, to_exclusive) in pending_prune_ranges.iter() {
let semaphore = semaphore.clone();
let cancel = cancel.child_token();
let db = db.clone();
let metrics = metrics.clone();
let handler = handler.clone();
let from = *from;
let to_exclusive = *to_exclusive;

debug!(
pipeline = H::NAME,
"Pruning from {} to {}", from, to_exclusive
);
tasks.push(tokio::spawn(async move {
let _permit = tokio::select! {
permit = semaphore.acquire() => {
permit.unwrap()
}
_ = cancel.cancelled() => {
return ((from, to_exclusive), Err(anyhow::anyhow!("Cancelled")));
}
};
let result = prune_task_impl(metrics, db, handler, from, to_exclusive).await;
((from, to_exclusive), result)
}));
}

let affected = match handler.prune(from, to_exclusive, &mut conn).await {
Ok(affected) => {
guard.stop_and_record();
watermark.pruner_hi = to_exclusive as i64;
affected
// (4) Wait for all tasks to finish.
// For each task, if it succeeds, remove the range from the pending_prune_ranges.
// Otherwise the range will remain in the map and will be retried in the next iteration.
while let Some(r) = tasks.next().await {
let ((from, to_exclusive), result) = r.unwrap();
match result {
Ok(()) => {
let removed = pending_prune_ranges.remove(&from);
assert_eq!(
removed,
Some(to_exclusive),
"Removed range is not the same as the one we expected to remove"
);
// The latest pruner_hi can be derived fromthe first checkpoint that has not yet been pruned.
// This would be the first key in the pending_prune_ranges map.
// It guarantees that all checkpoints up to it have been pruned.
// If the map is empty, then pruner_hi is the last checkpoint that has been pruned.
db_watermark.pruner_hi = pending_prune_ranges
.keys()
.next()
.cloned()
.unwrap_or(to_exclusive)
as i64;
metrics
.watermark_pruner_hi
.with_label_values(&[H::NAME])
.set(db_watermark.pruner_hi);
}

Err(e) => {
guard.stop_and_record();
error!(pipeline = H::NAME, "Failed to prune data: {e}");
break;
error!(
pipeline = H::NAME,
"Failed to prune data for range: {from} to {to_exclusive}: {e}"
);
}
};

metrics
.total_pruner_chunks_deleted
.with_label_values(&[H::NAME])
.inc();

metrics
.total_pruner_rows_deleted
.with_label_values(&[H::NAME])
.inc_by(affected as u64);

metrics
.watermark_pruner_hi
.with_label_values(&[H::NAME])
.set(watermark.pruner_hi);
}
}

// (4) Update the pruner watermark
Expand All @@ -190,7 +222,7 @@ pub(super) fn pruner<H: Handler + Send + Sync + 'static>(
continue;
};

match watermark.update(&mut conn).await {
match db_watermark.update(&mut conn).await {
Err(e) => {
let elapsed = guard.stop_and_record();
error!(
Expand All @@ -202,12 +234,12 @@ pub(super) fn pruner<H: Handler + Send + Sync + 'static>(

Ok(true) => {
let elapsed = guard.stop_and_record();
logger.log::<H>(&watermark, elapsed);
logger.log::<H>(&db_watermark, elapsed);

metrics
.watermark_pruner_hi_in_db
.with_label_values(&[H::NAME])
.set(watermark.pruner_hi);
.set(db_watermark.pruner_hi);
}
Ok(false) => {}
}
Expand All @@ -216,3 +248,49 @@ pub(super) fn pruner<H: Handler + Send + Sync + 'static>(
info!(pipeline = H::NAME, "Stopping pruner");
})
}

async fn prune_task_impl<H: Handler + Send + Sync + 'static>(
metrics: Arc<IndexerMetrics>,
db: Db,
handler: Arc<H>,
from: u64,
to_exclusive: u64,
) -> Result<(), anyhow::Error> {
metrics
.total_pruner_chunks_attempted
.with_label_values(&[H::NAME])
.inc();

let guard = metrics
.pruner_delete_latency
.with_label_values(&[H::NAME])
.start_timer();

let mut conn = db.connect().await?;

debug!(pipeline = H::NAME, "Pruning from {from} to {to_exclusive}");

let affected = match handler.prune(from, to_exclusive, &mut conn).await {
Ok(affected) => {
guard.stop_and_record();
affected
}

Err(e) => {
guard.stop_and_record();
return Err(e);
}
};

metrics
.total_pruner_chunks_deleted
.with_label_values(&[H::NAME])
.inc();

metrics
.total_pruner_rows_deleted
.with_label_values(&[H::NAME])
.inc_by(affected as u64);

Ok(())
}
2 changes: 2 additions & 0 deletions crates/sui-indexer-alt-framework/src/watermarks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,13 +215,15 @@ impl<'p> PrunerWatermark<'p> {
/// The next chunk of checkpoints that the pruner should work on, to advance the watermark.
/// If no more checkpoints to prune, returns `None`.
/// Otherwise, returns a tuple (from, to_exclusive) where `from` is inclusive and `to_exclusive` is exclusive.
/// Advance the watermark as well.
pub(crate) fn next_chunk(&mut self, size: u64) -> Option<(u64, u64)> {
if self.pruner_hi >= self.reader_lo {
return None;
}

let from = self.pruner_hi as u64;
let to_exclusive = (from + size).min(self.reader_lo as u64);
self.pruner_hi = to_exclusive as i64;
Some((from, to_exclusive))
}

Expand Down

0 comments on commit 79fad42

Please sign in to comment.