diff --git a/crates/sui-indexer-alt/src/config.rs b/crates/sui-indexer-alt/src/config.rs index 4a38af81d371f..ab297db368069 100644 --- a/crates/sui-indexer-alt/src/config.rs +++ b/crates/sui-indexer-alt/src/config.rs @@ -148,8 +148,11 @@ pub struct PipelineLayer { pub sum_packages: Option, // Concurrent pipelines with a lagged consistent pruner which is also a concurrent pipeline. + // Use concurrent layer for the pruner pipelines so that they could override checkpoint lag if needed. pub obj_info: Option, pub obj_info_pruner: Option, + pub coin_balance_buckets: Option, + pub coin_balance_buckets_pruner: Option, // All concurrent pipelines pub ev_emit_mod: Option, @@ -289,6 +292,10 @@ impl PipelineLayer { wal_obj_types: Some(Default::default()), sum_displays: Some(Default::default()), sum_packages: Some(Default::default()), + obj_info: Some(Default::default()), + obj_info_pruner: Some(Default::default()), + coin_balance_buckets: Some(Default::default()), + coin_balance_buckets_pruner: Some(Default::default()), ev_emit_mod: Some(Default::default()), ev_struct_inst: Some(Default::default()), kv_checkpoints: Some(Default::default()), @@ -298,8 +305,6 @@ impl PipelineLayer { kv_objects: Some(Default::default()), kv_protocol_configs: Some(Default::default()), kv_transactions: Some(Default::default()), - obj_info: Some(Default::default()), - obj_info_pruner: Some(Default::default()), obj_versions: Some(Default::default()), tx_affected_addresses: Some(Default::default()), tx_affected_objects: Some(Default::default()), @@ -429,6 +434,12 @@ impl Merge for PipelineLayer { wal_obj_types: self.wal_obj_types.merge(other.wal_obj_types), sum_displays: self.sum_displays.merge(other.sum_displays), sum_packages: self.sum_packages.merge(other.sum_packages), + obj_info: self.obj_info.merge(other.obj_info), + obj_info_pruner: self.obj_info_pruner.merge(other.obj_info_pruner), + coin_balance_buckets: self.coin_balance_buckets.merge(other.coin_balance_buckets), + coin_balance_buckets_pruner: self + .coin_balance_buckets_pruner + .merge(other.coin_balance_buckets_pruner), ev_emit_mod: self.ev_emit_mod.merge(other.ev_emit_mod), ev_struct_inst: self.ev_struct_inst.merge(other.ev_struct_inst), kv_checkpoints: self.kv_checkpoints.merge(other.kv_checkpoints), @@ -438,8 +449,6 @@ impl Merge for PipelineLayer { kv_objects: self.kv_objects.merge(other.kv_objects), kv_protocol_configs: self.kv_protocol_configs.merge(other.kv_protocol_configs), kv_transactions: self.kv_transactions.merge(other.kv_transactions), - obj_info: self.obj_info.merge(other.obj_info), - obj_info_pruner: self.obj_info_pruner.merge(other.obj_info_pruner), obj_versions: self.obj_versions.merge(other.obj_versions), tx_affected_addresses: self .tx_affected_addresses diff --git a/crates/sui-indexer-alt/src/handlers/coin_balance_buckets_pruner.rs b/crates/sui-indexer-alt/src/handlers/coin_balance_buckets_pruner.rs new file mode 100644 index 0000000000000..0e063a4b955d9 --- /dev/null +++ b/crates/sui-indexer-alt/src/handlers/coin_balance_buckets_pruner.rs @@ -0,0 +1,52 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use std::{collections::BTreeMap, sync::Arc}; + +use anyhow::Result; +use diesel::ExpressionMethods; +use diesel_async::RunQueryDsl; +use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; +use sui_indexer_alt_schema::schema::coin_balance_buckets; +use sui_pg_db as db; +use sui_types::full_checkpoint_content::CheckpointData; + +use super::coin_balance_buckets::{ + CoinBalanceBucketChangeKind, CoinBalanceBuckets, ProcessedCoinBalanceBucket, +}; + +pub(crate) struct CoinBalanceBucketsPruner; + +impl Processor for CoinBalanceBucketsPruner { + const NAME: &'static str = "coin_balance_buckets_pruner"; + type Value = ProcessedCoinBalanceBucket; + + fn process(&self, checkpoint: &Arc) -> Result> { + CoinBalanceBuckets.process(checkpoint) + } +} + +#[async_trait::async_trait] +impl Handler for CoinBalanceBucketsPruner { + async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result { + let mut to_prune = BTreeMap::new(); + for v in values { + let object_id = v.object_id; + let cp_sequence_number_exclusive = match v.change { + CoinBalanceBucketChangeKind::Insert { .. } => v.cp_sequence_number, + CoinBalanceBucketChangeKind::Delete => v.cp_sequence_number + 1, + } as i64; + let cp = to_prune.entry(object_id).or_default(); + *cp = std::cmp::max(*cp, cp_sequence_number_exclusive); + } + let mut committed_rows = 0; + for (object_id, cp_sequence_number_exclusive) in to_prune { + committed_rows += diesel::delete(coin_balance_buckets::table) + .filter(coin_balance_buckets::object_id.eq(object_id.as_slice())) + .filter(coin_balance_buckets::cp_sequence_number.lt(cp_sequence_number_exclusive)) + .execute(conn) + .await?; + } + Ok(committed_rows) + } +} diff --git a/crates/sui-indexer-alt/src/handlers/mod.rs b/crates/sui-indexer-alt/src/handlers/mod.rs index c4c2684e6f9eb..3897d29838964 100644 --- a/crates/sui-indexer-alt/src/handlers/mod.rs +++ b/crates/sui-indexer-alt/src/handlers/mod.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 pub(crate) mod coin_balance_buckets; +pub(crate) mod coin_balance_buckets_pruner; pub(crate) mod ev_emit_mod; pub(crate) mod ev_struct_inst; pub(crate) mod kv_checkpoints; diff --git a/crates/sui-indexer-alt/src/lib.rs b/crates/sui-indexer-alt/src/lib.rs index 1483f8a0ac1ba..4889d99bc83e2 100644 --- a/crates/sui-indexer-alt/src/lib.rs +++ b/crates/sui-indexer-alt/src/lib.rs @@ -4,6 +4,8 @@ use anyhow::Context; use bootstrap::bootstrap; use config::{ConsistencyConfig, IndexerConfig, PipelineLayer}; +use handlers::coin_balance_buckets::CoinBalanceBuckets; +use handlers::coin_balance_buckets_pruner::CoinBalanceBucketsPruner; use handlers::obj_info_pruner::ObjInfoPruner; use handlers::{ ev_emit_mod::EvEmitMod, ev_struct_inst::EvStructInst, kv_checkpoints::KvCheckpoints, @@ -63,6 +65,8 @@ pub async fn start_indexer( sum_packages, obj_info, obj_info_pruner, + coin_balance_buckets, + coin_balance_buckets_pruner, ev_emit_mod, ev_struct_inst, kv_checkpoints, @@ -255,6 +259,11 @@ pub async fn start_indexer( ObjInfoPruner, obj_info_pruner ); + add_concurrent_with_lagged_pruner!( + CoinBalanceBuckets, coin_balance_buckets; + CoinBalanceBucketsPruner, coin_balance_buckets_pruner + ); + // Unpruned concurrent pipelines add_concurrent!(EvEmitMod, ev_emit_mod); add_concurrent!(EvStructInst, ev_struct_inst);