Skip to content

Commit

Permalink
[indexer-alt] Add coin_balance_buckets_pruner (#20568)
Browse files Browse the repository at this point in the history
## Description 

Describe the changes or additions included in this PR.

## Test plan 

How did you test the new or updated feature?

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
lxfind authored Dec 11, 2024
1 parent a5ce934 commit 481390c
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 4 deletions.
17 changes: 13 additions & 4 deletions crates/sui-indexer-alt/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,11 @@ pub struct PipelineLayer {
pub sum_packages: Option<SequentialLayer>,

// Concurrent pipelines with a lagged consistent pruner which is also a concurrent pipeline.
// Use concurrent layer for the pruner pipelines so that they could override checkpoint lag if needed.
pub obj_info: Option<CommitterLayer>,
pub obj_info_pruner: Option<ConcurrentLayer>,
pub coin_balance_buckets: Option<CommitterLayer>,
pub coin_balance_buckets_pruner: Option<ConcurrentLayer>,

// All concurrent pipelines
pub ev_emit_mod: Option<ConcurrentLayer>,
Expand Down Expand Up @@ -289,6 +292,10 @@ impl PipelineLayer {
wal_obj_types: Some(Default::default()),
sum_displays: Some(Default::default()),
sum_packages: Some(Default::default()),
obj_info: Some(Default::default()),
obj_info_pruner: Some(Default::default()),
coin_balance_buckets: Some(Default::default()),
coin_balance_buckets_pruner: Some(Default::default()),
ev_emit_mod: Some(Default::default()),
ev_struct_inst: Some(Default::default()),
kv_checkpoints: Some(Default::default()),
Expand All @@ -298,8 +305,6 @@ impl PipelineLayer {
kv_objects: Some(Default::default()),
kv_protocol_configs: Some(Default::default()),
kv_transactions: Some(Default::default()),
obj_info: Some(Default::default()),
obj_info_pruner: Some(Default::default()),
obj_versions: Some(Default::default()),
tx_affected_addresses: Some(Default::default()),
tx_affected_objects: Some(Default::default()),
Expand Down Expand Up @@ -429,6 +434,12 @@ impl Merge for PipelineLayer {
wal_obj_types: self.wal_obj_types.merge(other.wal_obj_types),
sum_displays: self.sum_displays.merge(other.sum_displays),
sum_packages: self.sum_packages.merge(other.sum_packages),
obj_info: self.obj_info.merge(other.obj_info),
obj_info_pruner: self.obj_info_pruner.merge(other.obj_info_pruner),
coin_balance_buckets: self.coin_balance_buckets.merge(other.coin_balance_buckets),
coin_balance_buckets_pruner: self
.coin_balance_buckets_pruner
.merge(other.coin_balance_buckets_pruner),
ev_emit_mod: self.ev_emit_mod.merge(other.ev_emit_mod),
ev_struct_inst: self.ev_struct_inst.merge(other.ev_struct_inst),
kv_checkpoints: self.kv_checkpoints.merge(other.kv_checkpoints),
Expand All @@ -438,8 +449,6 @@ impl Merge for PipelineLayer {
kv_objects: self.kv_objects.merge(other.kv_objects),
kv_protocol_configs: self.kv_protocol_configs.merge(other.kv_protocol_configs),
kv_transactions: self.kv_transactions.merge(other.kv_transactions),
obj_info: self.obj_info.merge(other.obj_info),
obj_info_pruner: self.obj_info_pruner.merge(other.obj_info_pruner),
obj_versions: self.obj_versions.merge(other.obj_versions),
tx_affected_addresses: self
.tx_affected_addresses
Expand Down
52 changes: 52 additions & 0 deletions crates/sui-indexer-alt/src/handlers/coin_balance_buckets_pruner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::{collections::BTreeMap, sync::Arc};

use anyhow::Result;
use diesel::ExpressionMethods;
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_schema::schema::coin_balance_buckets;
use sui_pg_db as db;
use sui_types::full_checkpoint_content::CheckpointData;

use super::coin_balance_buckets::{
CoinBalanceBucketChangeKind, CoinBalanceBuckets, ProcessedCoinBalanceBucket,
};

pub(crate) struct CoinBalanceBucketsPruner;

impl Processor for CoinBalanceBucketsPruner {
const NAME: &'static str = "coin_balance_buckets_pruner";
type Value = ProcessedCoinBalanceBucket;

fn process(&self, checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
CoinBalanceBuckets.process(checkpoint)
}
}

#[async_trait::async_trait]
impl Handler for CoinBalanceBucketsPruner {
async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result<usize> {
let mut to_prune = BTreeMap::new();
for v in values {
let object_id = v.object_id;
let cp_sequence_number_exclusive = match v.change {
CoinBalanceBucketChangeKind::Insert { .. } => v.cp_sequence_number,
CoinBalanceBucketChangeKind::Delete => v.cp_sequence_number + 1,
} as i64;
let cp = to_prune.entry(object_id).or_default();
*cp = std::cmp::max(*cp, cp_sequence_number_exclusive);
}
let mut committed_rows = 0;
for (object_id, cp_sequence_number_exclusive) in to_prune {
committed_rows += diesel::delete(coin_balance_buckets::table)
.filter(coin_balance_buckets::object_id.eq(object_id.as_slice()))
.filter(coin_balance_buckets::cp_sequence_number.lt(cp_sequence_number_exclusive))
.execute(conn)
.await?;
}
Ok(committed_rows)
}
}
1 change: 1 addition & 0 deletions crates/sui-indexer-alt/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

pub(crate) mod coin_balance_buckets;
pub(crate) mod coin_balance_buckets_pruner;
pub(crate) mod ev_emit_mod;
pub(crate) mod ev_struct_inst;
pub(crate) mod kv_checkpoints;
Expand Down
9 changes: 9 additions & 0 deletions crates/sui-indexer-alt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
use anyhow::Context;
use bootstrap::bootstrap;
use config::{ConsistencyConfig, IndexerConfig, PipelineLayer};
use handlers::coin_balance_buckets::CoinBalanceBuckets;
use handlers::coin_balance_buckets_pruner::CoinBalanceBucketsPruner;
use handlers::obj_info_pruner::ObjInfoPruner;
use handlers::{
ev_emit_mod::EvEmitMod, ev_struct_inst::EvStructInst, kv_checkpoints::KvCheckpoints,
Expand Down Expand Up @@ -63,6 +65,8 @@ pub async fn start_indexer(
sum_packages,
obj_info,
obj_info_pruner,
coin_balance_buckets,
coin_balance_buckets_pruner,
ev_emit_mod,
ev_struct_inst,
kv_checkpoints,
Expand Down Expand Up @@ -255,6 +259,11 @@ pub async fn start_indexer(
ObjInfoPruner, obj_info_pruner
);

add_concurrent_with_lagged_pruner!(
CoinBalanceBuckets, coin_balance_buckets;
CoinBalanceBucketsPruner, coin_balance_buckets_pruner
);

// Unpruned concurrent pipelines
add_concurrent!(EvEmitMod, ev_emit_mod);
add_concurrent!(EvStructInst, ev_struct_inst);
Expand Down

0 comments on commit 481390c

Please sign in to comment.