diff --git a/crates/sui-indexer-alt-framework/src/handlers/cp_sequence_numbers.rs b/crates/sui-indexer-alt-framework/src/handlers/cp_sequence_numbers.rs index 0948c74a9c789..06fea4a0c8968 100644 --- a/crates/sui-indexer-alt-framework/src/handlers/cp_sequence_numbers.rs +++ b/crates/sui-indexer-alt-framework/src/handlers/cp_sequence_numbers.rs @@ -3,23 +3,14 @@ use std::sync::Arc; +use crate::models::cp_sequence_numbers::StoredCpSequenceNumbers; use crate::pipeline::{concurrent::Handler, Processor}; use crate::schema::cp_sequence_numbers; use anyhow::Result; -use diesel::prelude::*; use diesel_async::RunQueryDsl; -use sui_field_count::FieldCount; use sui_pg_db::{self as db}; use sui_types::full_checkpoint_content::CheckpointData; -#[derive(Insertable, Selectable, Queryable, Debug, Clone, FieldCount)] -#[diesel(table_name = cp_sequence_numbers)] -pub struct StoredCpSequenceNumbers { - pub cp_sequence_number: i64, - pub tx_lo: i64, - pub epoch: i64, -} - pub struct CpSequenceNumbers; impl Processor for CpSequenceNumbers { diff --git a/crates/sui-indexer-alt-framework/src/lib.rs b/crates/sui-indexer-alt-framework/src/lib.rs index ad07b09718563..d0721deb84f12 100644 --- a/crates/sui-indexer-alt-framework/src/lib.rs +++ b/crates/sui-indexer-alt-framework/src/lib.rs @@ -26,6 +26,7 @@ use watermarks::CommitterWatermark; pub mod handlers; pub mod ingestion; pub(crate) mod metrics; +pub mod models; pub mod pipeline; pub(crate) mod schema; pub mod task; diff --git a/crates/sui-indexer-alt-framework/src/models/cp_sequence_numbers.rs b/crates/sui-indexer-alt-framework/src/models/cp_sequence_numbers.rs new file mode 100644 index 0000000000000..38dbac694db7e --- /dev/null +++ b/crates/sui-indexer-alt-framework/src/models/cp_sequence_numbers.rs @@ -0,0 +1,88 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use crate::schema::cp_sequence_numbers; +use anyhow::{bail, Result}; +use diesel::prelude::*; +use diesel_async::RunQueryDsl; +use std::ops::Range; +use sui_field_count::FieldCount; +use sui_pg_db::Connection; + +#[derive(Insertable, Selectable, Queryable, Debug, Clone, FieldCount)] +#[diesel(table_name = cp_sequence_numbers)] +pub struct StoredCpSequenceNumbers { + pub cp_sequence_number: i64, + pub tx_lo: i64, + pub epoch: i64, +} + +/// Inclusive start and exclusive end range of prunable txs. +pub async fn tx_interval(conn: &mut Connection<'_>, cps: Range) -> Result> { + let result = get_range(conn, cps).await?; + + Ok(Range { + start: result.0.tx_lo as u64, + end: result.1.tx_lo as u64, + }) +} + +/// Inclusive start and exclusive end range of epochs. +/// +/// The two values in the tuple represent which epoch the `from` and `to` checkpoints come from, +/// respectively. +pub async fn epoch_interval(conn: &mut Connection<'_>, cps: Range) -> Result> { + let result = get_range(conn, cps).await?; + + Ok(Range { + start: result.0.epoch as u64, + end: result.1.epoch as u64, + }) +} + +/// Gets the tx and epoch mappings for the given checkpoint range. +/// +/// The values are expected to exist since the cp_mapping table must have enough information to +/// encompass the retention of other tables. +pub(crate) async fn get_range( + conn: &mut Connection<'_>, + cps: Range, +) -> Result<(StoredCpSequenceNumbers, StoredCpSequenceNumbers)> { + let Range { + start: from_cp, + end: to_cp, + } = cps; + + if from_cp >= to_cp { + bail!(format!( + "Invalid checkpoint range: `from` {from_cp} must be less than `to` {to_cp}" + )); + } + + let results = cp_sequence_numbers::table + .select(StoredCpSequenceNumbers::as_select()) + .filter(cp_sequence_numbers::cp_sequence_number.eq_any([from_cp as i64, to_cp as i64])) + .order(cp_sequence_numbers::cp_sequence_number.asc()) + .load::(conn) + .await + .map_err(anyhow::Error::from)?; + + let Some(from) = results + .iter() + .find(|cp| cp.cp_sequence_number == from_cp as i64) + else { + bail!(format!( + "No checkpoint mapping found for checkpoint {from_cp}" + )); + }; + let Some(to) = results + .iter() + .find(|cp| cp.cp_sequence_number == to_cp as i64) + else { + bail!(format!( + "No checkpoint mapping found for checkpoint {to_cp}" + )); + }; + + Ok((from.clone(), to.clone())) +} diff --git a/crates/sui-indexer-alt-framework/src/models/mod.rs b/crates/sui-indexer-alt-framework/src/models/mod.rs new file mode 100644 index 0000000000000..350ec29d5fd25 --- /dev/null +++ b/crates/sui-indexer-alt-framework/src/models/mod.rs @@ -0,0 +1,4 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +pub mod cp_sequence_numbers; diff --git a/crates/sui-indexer-alt/src/README.md b/crates/sui-indexer-alt/src/README.md index d4f0046058251..856b2d124ed72 100644 --- a/crates/sui-indexer-alt/src/README.md +++ b/crates/sui-indexer-alt/src/README.md @@ -6,3 +6,9 @@ The required flags are --remote-store-url (or --local-ingestion-path) and the -- ``` cargo run --bin sui-indexer-alt -- --database-url {url} indexer --remote-store-url https://checkpoints.mainnet.sui.io --skip-watermark --first-checkpoint 68918060 --last-checkpoint 68919060 --config indexer_alt_config.toml ``` + +## Pruning +To enable pruning, the `cp_sequence_numbers` pipeline must be enabled. Otherwise, even if pruning logic is +configured for a table, the pruner task itself will skip if it cannot find a mapping for the +checkpoint pruning watermark. Only one committer needs to update this table - it is not necessary +for every indexer instance to have this pipeline enabled.