Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

indexer-alt: pruner task #20217

Merged
merged 2 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions crates/sui-indexer-alt/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,15 @@ pub enum Command {
)]
consistent_pruning_interval: Duration,

/// How long to wait before honouring reader low watermarks.
#[arg(
long,
default_value = "120",
value_name = "SECONDS",
value_parser = |s: &str| s.parse().map(Duration::from_secs),
)]
pruner_delay: Duration,

/// Number of checkpoints to delay indexing summary tables for.
#[clap(long)]
consistent_range: Option<u64>,
Expand Down
8 changes: 8 additions & 0 deletions crates/sui-indexer-alt/src/handlers/wal_coin_balances.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::sync::Arc;

use anyhow::Result;
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_types::full_checkpoint_content::CheckpointData;

Expand Down Expand Up @@ -56,4 +57,11 @@ impl Handler for WalCoinBalances {
.execute(conn)
.await?)
}

async fn prune(from: u64, to: u64, conn: &mut db::Connection<'_>) -> Result<usize> {
let filter = wal_coin_balances::table
.filter(wal_coin_balances::cp_sequence_number.between(from as i64, to as i64 - 1));
amnn marked this conversation as resolved.
Show resolved Hide resolved

Ok(diesel::delete(filter).execute(conn).await?)
}
}
8 changes: 8 additions & 0 deletions crates/sui-indexer-alt/src/handlers/wal_obj_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::sync::Arc;

use anyhow::Result;
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_types::full_checkpoint_content::CheckpointData;

Expand Down Expand Up @@ -59,4 +60,11 @@ impl Handler for WalObjTypes {
.execute(conn)
.await?)
}

async fn prune(from: u64, to: u64, conn: &mut db::Connection<'_>) -> Result<usize> {
let filter = wal_obj_types::table
.filter(wal_obj_types::cp_sequence_number.between(from as i64, to as i64 - 1));

Ok(diesel::delete(filter).execute(conn).await?)
}
}
2 changes: 2 additions & 0 deletions crates/sui-indexer-alt/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ async fn main() -> Result<()> {
Command::Indexer {
indexer,
consistent_pruning_interval,
pruner_delay,
consistent_range: lag,
} => {
let retry_interval = indexer.ingestion_config.retry_interval;
Expand All @@ -61,6 +62,7 @@ async fn main() -> Result<()> {
// write-ahead log needs to be pruned.
let pruner_config = lag.map(|l| PrunerConfig {
interval: consistent_pruning_interval,
delay: pruner_delay,
// Retain at least twice as much data as the lag, to guarantee overlap between the
// summary table and the write-ahead log.
retention: l * 2,
Expand Down
69 changes: 68 additions & 1 deletion crates/sui-indexer-alt/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,32 +81,40 @@ pub struct IndexerMetrics {

pub handler_checkpoint_latency: HistogramVec,

// Statistics related to individual ingestion pipelines' committers.
// Statistics related to individual ingestion pipelines.
pub total_collector_rows_received: IntCounterVec,
pub total_collector_batches_created: IntCounterVec,
pub total_committer_batches_attempted: IntCounterVec,
pub total_committer_batches_succeeded: IntCounterVec,
pub total_committer_rows_committed: IntCounterVec,
pub total_committer_rows_affected: IntCounterVec,
pub total_watermarks_out_of_order: IntCounterVec,
pub total_pruner_chunks_attempted: IntCounterVec,
pub total_pruner_chunks_deleted: IntCounterVec,
pub total_pruner_rows_deleted: IntCounterVec,

pub collector_gather_latency: HistogramVec,
pub collector_batch_size: HistogramVec,
pub committer_commit_latency: HistogramVec,
pub watermark_gather_latency: HistogramVec,
pub watermark_commit_latency: HistogramVec,
pub watermark_pruner_read_latency: HistogramVec,
pub watermark_pruner_write_latency: HistogramVec,
pub pruner_delete_latency: HistogramVec,

pub watermark_epoch: IntGaugeVec,
pub watermark_checkpoint: IntGaugeVec,
pub watermark_transaction: IntGaugeVec,
pub watermark_timestamp_ms: IntGaugeVec,
pub watermark_reader_lo: IntGaugeVec,
pub watermark_pruner_hi: IntGaugeVec,

pub watermark_epoch_in_db: IntGaugeVec,
pub watermark_checkpoint_in_db: IntGaugeVec,
pub watermark_transaction_in_db: IntGaugeVec,
pub watermark_timestamp_in_db_ms: IntGaugeVec,
pub watermark_reader_lo_in_db: IntGaugeVec,
pub watermark_pruner_hi_in_db: IntGaugeVec,
}

/// Collects information about the database connection pool.
Expand Down Expand Up @@ -319,6 +327,27 @@ impl IndexerMetrics {
registry,
)
.unwrap(),
total_pruner_chunks_attempted: register_int_counter_vec_with_registry!(
"indexer_pruner_chunks_attempted",
"Number of chunks this pruner attempted to delete",
&["pipeline"],
registry,
)
amnn marked this conversation as resolved.
Show resolved Hide resolved
.unwrap(),
total_pruner_chunks_deleted: register_int_counter_vec_with_registry!(
"indexer_pruner_chunks_deleted",
"Number of chunks this pruner successfully deleted",
&["pipeline"],
registry,
)
.unwrap(),
total_pruner_rows_deleted: register_int_counter_vec_with_registry!(
"indexer_pruner_rows_deleted",
"Number of rows this pruner successfully deleted",
&["pipeline"],
registry,
)
.unwrap(),
collector_gather_latency: register_histogram_vec_with_registry!(
"indexer_collector_gather_latency",
"Time taken to gather rows into a batch by this collector",
Expand Down Expand Up @@ -359,6 +388,30 @@ impl IndexerMetrics {
registry,
)
.unwrap(),
watermark_pruner_read_latency: register_histogram_vec_with_registry!(
"indexer_watermark_pruner_read_latency",
"Time taken to read pruner's next upper and lowerbounds from the database by this pruner",
&["pipeline"],
DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
registry,
)
.unwrap(),
watermark_pruner_write_latency: register_histogram_vec_with_registry!(
"indexer_watermark_pruner_write_latency",
"Time taken to write the pruner's new upperbound to the database by this pruner",
&["pipeline"],
DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
registry,
)
.unwrap(),
pruner_delete_latency: register_histogram_vec_with_registry!(
"indexer_pruner_delete_latency",
"Time taken to delete a chunk of data from the database by this pruner",
&["pipeline"],
DB_UPDATE_LATENCY_SEC_BUCKETS.to_vec(),
registry,
)
.unwrap(),
watermark_epoch: register_int_gauge_vec_with_registry!(
"indexer_watermark_epoch",
"Current epoch high watermark for this committer",
Expand Down Expand Up @@ -394,6 +447,13 @@ impl IndexerMetrics {
registry,
)
.unwrap(),
watermark_pruner_hi: register_int_gauge_vec_with_registry!(
"indexer_watermark_pruner_hi",
"Current pruner high watermark for this pruner",
&["pipeline"],
registry,
)
.unwrap(),
watermark_epoch_in_db: register_int_gauge_vec_with_registry!(
"indexer_watermark_epoch_in_db",
"Last epoch high watermark this committer wrote to the DB",
Expand Down Expand Up @@ -429,6 +489,13 @@ impl IndexerMetrics {
registry,
)
.unwrap(),
watermark_pruner_hi_in_db: register_int_gauge_vec_with_registry!(
"indexer_watermark_pruner_hi_in_db",
"Last pruner high watermark this pruner wrote to the DB",
&["pipeline"],
registry,
)
.unwrap(),
}
}

Expand Down
92 changes: 90 additions & 2 deletions crates/sui-indexer-alt/src/models/watermarks.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::borrow::Cow;
use std::{borrow::Cow, time::Duration};

use chrono::{naive::NaiveDateTime, DateTime, Utc};
use diesel::prelude::*;
use diesel::{dsl::sql, prelude::*, sql_types};
use diesel_async::RunQueryDsl;
use sui_field_count::FieldCount;

Expand Down Expand Up @@ -42,6 +42,24 @@ pub struct ReaderWatermark<'p> {
pub reader_lo: i64,
}

#[derive(Queryable, Debug, Clone, FieldCount)]
#[diesel(table_name = watermarks)]
pub struct PrunerWatermark<'p> {
/// The pipeline in question
pub pipeline: Cow<'p, str>,

/// How long to wait from when this query ran on the database until this information can be
/// used to prune the database. This number could be negative, meaning no waiting is necessary.
pub wait_for: i64,

/// The pruner can delete up to this checkpoint, (exclusive).
pub reader_lo: i64,

/// The pruner has already deleted up to this checkpoint (exclusive), so can continue from this
/// point.
pub pruner_hi: i64,
amnn marked this conversation as resolved.
Show resolved Hide resolved
}

impl StoredWatermark {
pub async fn get(
conn: &mut Connection<'_>,
Expand Down Expand Up @@ -129,6 +147,76 @@ impl<'p> ReaderWatermark<'p> {
}
}

impl PrunerWatermark<'static> {
/// Get the bounds for the region that the pruner still has to prune for the given `pipeline`,
/// along with a duration to wait before acting on this information, based on the time at which
/// the pruner last updated the bounds, and the configured `delay`.
///
/// The pruner is allowed to prune the region between the returned `pruner_hi` (inclusive) and
/// `reader_lo` (exclusive) after `wait_for` milliseconds have passed since this response was
/// returned.
pub async fn get(
conn: &mut Connection<'_>,
pipeline: &'static str,
delay: Duration,
) -> QueryResult<Option<Self>> {
// |---------- + delay ---------------------|
// |--- wait_for ---|
// |-----------------------|----------------|
// ^ ^
// pruner_timestamp NOW()
let wait_for = sql::<sql_types::BigInt>(&format!(
"CAST({} + 1000 * EXTRACT(EPOCH FROM pruner_timestamp - NOW()) AS BIGINT)",
delay.as_millis(),
));

watermarks::table
.select((
watermarks::pipeline,
wait_for,
watermarks::reader_lo,
watermarks::pruner_hi,
))
.filter(watermarks::pipeline.eq(pipeline))
.first(conn)
.await
.optional()
}
}

impl<'p> PrunerWatermark<'p> {
/// How long to wait before the pruner can act on this information, or `None`, if there is no
/// need to wait.
pub fn wait_for(&self) -> Option<Duration> {
(self.wait_for > 0).then(|| Duration::from_millis(self.wait_for as u64))
}

/// Whether the pruner has any work left to do on the range in this watermark.
pub fn is_empty(&self) -> bool {
self.pruner_hi >= self.reader_lo
}

/// The next chunk that the pruner should work on, to advance the watermark.
pub fn next_chunk(&mut self, size: u64) -> (u64, u64) {
let from = self.pruner_hi as u64;
let to = (from + size).min(self.reader_lo as u64);
(from, to)
}

/// Update the pruner high watermark (only) for an existing watermark row, as long as this
/// raises the watermark.
///
/// Returns a boolean indicating whether the watermark was actually updated or not.
pub async fn update(&self, conn: &mut Connection<'_>) -> QueryResult<bool> {
Ok(diesel::update(watermarks::table)
.set(watermarks::pruner_hi.eq(self.pruner_hi))
.filter(watermarks::pipeline.eq(&self.pipeline))
.execute(conn)
.await?
> 0)
}
}

impl<'p> From<CommitterWatermark<'p>> for StoredWatermark {
fn from(watermark: CommitterWatermark<'p>) -> Self {
StoredWatermark {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,8 @@ pub(super) fn commit_watermark<H: Handler + 'static>(
}

if watermark.checkpoint_hi_inclusive > next_loud_watermark_update {
next_loud_watermark_update += LOUD_WATERMARK_UPDATE_INTERVAL;
next_loud_watermark_update = watermark.checkpoint_hi_inclusive + LOUD_WATERMARK_UPDATE_INTERVAL;

info!(
pipeline = H::NAME,
epoch = watermark.epoch_hi_inclusive,
Expand Down
26 changes: 23 additions & 3 deletions crates/sui-indexer-alt/src/pipeline/concurrent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@ use crate::{

use super::{processor::processor, PipelineConfig, Processor, WatermarkPart, PIPELINE_BUFFER};

use self::{collector::collector, commit_watermark::commit_watermark, committer::committer};
use self::{
collector::collector, commit_watermark::commit_watermark, committer::committer, pruner::pruner,
};

mod collector;
mod commit_watermark;
mod committer;
mod pruner;
mod reader_watermark;

/// The maximum number of watermarks that can show up in a single batch. This limit exists to deal
Expand Down Expand Up @@ -63,13 +66,23 @@ pub trait Handler: Processor {
/// affected.
async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>)
-> anyhow::Result<usize>;

/// Clean up data between checkpoints `_from` and `_to` (inclusive) in the database, returning
/// the number of rows affected. This function is optional, and defaults to not pruning at all.
async fn prune(_from: u64, _to: u64, _conn: &mut db::Connection<'_>) -> anyhow::Result<usize> {
Ok(0)
}
}

#[derive(Debug, Clone)]
pub struct PrunerConfig {
/// How often the pruner should check whether there is any data to prune.
pub interval: Duration,

/// How long to wait after the reader low watermark was set, until it is safe to prune up until
/// this new watermark.
pub delay: Duration,

/// How much data to keep, this is measured in checkpoints.
pub retention: u64,

Expand Down Expand Up @@ -181,12 +194,19 @@ pub(crate) fn pipeline<H: Handler + Send + Sync + 'static>(
cancel,
);

let reader_watermark = reader_watermark::<H>(pruner_config, db, metrics, pruner_cancel.clone());
let reader_watermark = reader_watermark::<H>(
pruner_config.clone(),
db.clone(),
metrics.clone(),
pruner_cancel.clone(),
);

let pruner = pruner::<H>(pruner_config, db, metrics, pruner_cancel.clone());

tokio::spawn(async move {
let (_, _, _, _) = futures::join!(processor, collector, committer, commit_watermark);

pruner_cancel.cancel();
let _ = futures::join!(reader_watermark);
let _ = futures::join!(reader_watermark, pruner);
})
}
Loading
Loading