Skip to content

Commit

Permalink
[indexer-alt] add cp_sequence_numbers table and handler to indexer-al…
Browse files Browse the repository at this point in the history
…t-framework, add as an optional pipeline to indexer-alt (#20626)

## Description 

Framework introduces `cp_sequence_numbers` which maps a given checkpoint
sequence number to its first tx and containing epoch. This will be used
later during pruning, for tables that need more info than the prunable
checkpoint range.

## Test plan 

How did you test the new or updated feature?

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
wlmyng authored Dec 23, 2024
1 parent e287607 commit e62fa12
Show file tree
Hide file tree
Showing 12 changed files with 103 additions and 5 deletions.
1 change: 1 addition & 0 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ jobs:
- name: Indexer Alt Framework schema
run: |
./crates/sui-indexer-alt-framework/generate_schema.sh
cargo fmt -- crates/sui-indexer-alt-framework/src/schema.rs
# Ensure there are no uncommitted changes in the repo after running tests
- run: scripts/changed-files.sh
shell: bash
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS cp_sequence_numbers;
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
-- This table maps a checkpoint sequence number to the containing epoch and first transaction
-- sequence number in the checkpoint.
CREATE TABLE IF NOT EXISTS cp_sequence_numbers
(
cp_sequence_number BIGINT PRIMARY KEY,
-- The network total transactions at the end of this checkpoint subtracted by the number of
-- transactions in the checkpoint.
tx_lo BIGINT NOT NULL,
-- The epoch this checkpoint belongs to.
epoch BIGINT NOT NULL
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::sync::Arc;

use crate::pipeline::{concurrent::Handler, Processor};
use crate::schema::cp_sequence_numbers;
use anyhow::Result;
use diesel::prelude::*;
use diesel_async::RunQueryDsl;
use sui_field_count::FieldCount;
use sui_pg_db::{self as db};
use sui_types::full_checkpoint_content::CheckpointData;

#[derive(Insertable, Selectable, Queryable, Debug, Clone, FieldCount)]
#[diesel(table_name = cp_sequence_numbers)]
pub struct StoredCpSequenceNumbers {
pub cp_sequence_number: i64,
pub tx_lo: i64,
pub epoch: i64,
}

pub struct CpSequenceNumbers;

impl Processor for CpSequenceNumbers {
const NAME: &'static str = "cp_sequence_numbers";

type Value = StoredCpSequenceNumbers;

fn process(&self, checkpoint: &Arc<CheckpointData>) -> Result<Vec<Self::Value>> {
let cp_sequence_number = checkpoint.checkpoint_summary.sequence_number as i64;
let network_total_transactions =
checkpoint.checkpoint_summary.network_total_transactions as i64;
let tx_lo = network_total_transactions - checkpoint.transactions.len() as i64;
let epoch = checkpoint.checkpoint_summary.epoch as i64;
Ok(vec![StoredCpSequenceNumbers {
cp_sequence_number,
tx_lo,
epoch,
}])
}
}

#[async_trait::async_trait]
impl Handler for CpSequenceNumbers {
async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result<usize> {
Ok(diesel::insert_into(cp_sequence_numbers::table)
.values(values)
.on_conflict_do_nothing()
.execute(conn)
.await?)
}
}
4 changes: 4 additions & 0 deletions crates/sui-indexer-alt-framework/src/handlers/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

pub mod cp_sequence_numbers;
1 change: 1 addition & 0 deletions crates/sui-indexer-alt-framework/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use tokio_util::sync::CancellationToken;
use tracing::{info, warn};
use watermarks::CommitterWatermark;

pub mod handlers;
pub mod ingestion;
pub(crate) mod metrics;
pub mod pipeline;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ use crate::{
use super::{Handler, PrunerConfig};

/// The pruner task is responsible for deleting old data from the database. It will periodically
/// check the `watermarks` table to see if there is any data that should be pruned -- between
/// `pruner_hi` (inclusive), and `reader_lo` (exclusive).
/// check the `watermarks` table to see if there is any data that should be pruned between the
/// `pruner_hi` (inclusive), and `reader_lo` (exclusive) checkpoints. This task will also provide a
/// mapping of the pruned checkpoints to their corresponding epoch and tx, which the handler can
/// then use to delete the corresponding data from the database.
///
/// To ensure that the pruner does not interfere with reads that are still in flight, it respects
/// the watermark's `pruner_timestamp`, which records the time that `reader_lo` was last updated.
/// The task will not prune data until at least `config.delay()` has passed since
/// `pruner_timestamp` to give in-flight reads time to land.
/// The task will not prune data until at least `config.delay()` has passed since `pruner_timestamp`
/// to give in-flight reads time to land.
///
/// The task regularly traces its progress, outputting at a higher log level every
/// [LOUD_WATERMARK_UPDATE_INTERVAL]-many checkpoints.
Expand Down
10 changes: 10 additions & 0 deletions crates/sui-indexer-alt-framework/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@
// SPDX-License-Identifier: Apache-2.0
// @generated automatically by Diesel CLI.

diesel::table! {
cp_sequence_numbers (cp_sequence_number) {
cp_sequence_number -> Int8,
tx_lo -> Int8,
epoch -> Int8,
}
}

diesel::table! {
watermarks (pipeline) {
pipeline -> Text,
Expand All @@ -14,3 +22,5 @@ diesel::table! {
pruner_hi -> Int8,
}
}

diesel::allow_tables_to_appear_in_same_query!(cp_sequence_numbers, watermarks,);
3 changes: 2 additions & 1 deletion crates/sui-indexer-alt-framework/src/watermarks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ impl<'p> PrunerWatermark<'p> {
self.pruner_hi >= self.reader_lo
}

/// The next chunk that the pruner should work on, to advance the watermark.
/// The next chunk of checkpoints that the pruner should work on, to advance the watermark.
/// Returns a tuple (from, to) where `from` is inclusive and `to` is exclusive.
pub(crate) fn next_chunk(&mut self, size: u64) -> (u64, u64) {
let from = self.pruner_hi as u64;
let to = (from + size).min(self.reader_lo as u64);
Expand Down
8 changes: 8 additions & 0 deletions crates/sui-indexer-alt/src/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# sui-indexer-alt

## Running
The required flags are --remote-store-url (or --local-ingestion-path) and the --config. If both are provided, remote-store-url will be used.

```
cargo run --bin sui-indexer-alt -- --database-url {url} indexer --remote-store-url https://checkpoints.mainnet.sui.io --skip-watermark --first-checkpoint 68918060 --last-checkpoint 68919060 --config indexer_alt_config.toml
```
3 changes: 3 additions & 0 deletions crates/sui-indexer-alt/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ pub struct PipelineLayer {
pub coin_balance_buckets_pruner: Option<ConcurrentLayer>,

// All concurrent pipelines
pub cp_sequence_numbers: Option<ConcurrentLayer>,
pub ev_emit_mod: Option<ConcurrentLayer>,
pub ev_struct_inst: Option<ConcurrentLayer>,
pub kv_checkpoints: Option<ConcurrentLayer>,
Expand Down Expand Up @@ -273,6 +274,7 @@ impl PipelineLayer {
obj_info_pruner: Some(Default::default()),
coin_balance_buckets: Some(Default::default()),
coin_balance_buckets_pruner: Some(Default::default()),
cp_sequence_numbers: Some(Default::default()),
ev_emit_mod: Some(Default::default()),
ev_struct_inst: Some(Default::default()),
kv_checkpoints: Some(Default::default()),
Expand Down Expand Up @@ -409,6 +411,7 @@ impl Merge for PipelineLayer {
coin_balance_buckets_pruner: self
.coin_balance_buckets_pruner
.merge(other.coin_balance_buckets_pruner),
cp_sequence_numbers: self.cp_sequence_numbers.merge(other.cp_sequence_numbers),
ev_emit_mod: self.ev_emit_mod.merge(other.ev_emit_mod),
ev_struct_inst: self.ev_struct_inst.merge(other.ev_struct_inst),
kv_checkpoints: self.kv_checkpoints.merge(other.kv_checkpoints),
Expand Down
3 changes: 3 additions & 0 deletions crates/sui-indexer-alt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use handlers::{
tx_affected_objects::TxAffectedObjects, tx_balance_changes::TxBalanceChanges,
tx_calls::TxCalls, tx_digests::TxDigests, tx_kinds::TxKinds,
};
use sui_indexer_alt_framework::handlers::cp_sequence_numbers::CpSequenceNumbers;
use sui_indexer_alt_framework::ingestion::{ClientArgs, IngestionConfig};
use sui_indexer_alt_framework::pipeline::{
concurrent::{ConcurrentConfig, PrunerConfig},
Expand Down Expand Up @@ -62,6 +63,7 @@ pub async fn start_indexer(
obj_info_pruner,
coin_balance_buckets,
coin_balance_buckets_pruner,
cp_sequence_numbers,
ev_emit_mod,
ev_struct_inst,
kv_checkpoints,
Expand Down Expand Up @@ -201,6 +203,7 @@ pub async fn start_indexer(
);

// Unpruned concurrent pipelines
add_concurrent!(CpSequenceNumbers, cp_sequence_numbers);
add_concurrent!(EvEmitMod, ev_emit_mod);
add_concurrent!(EvStructInst, ev_struct_inst);
add_concurrent!(KvCheckpoints, kv_checkpoints);
Expand Down

0 comments on commit e62fa12

Please sign in to comment.