Skip to content

Commit

Permalink
[indexer] Standalone synthetic ingestion (#20270)
Browse files Browse the repository at this point in the history
## Description 

Decouples ingestion and benchmarking, remove the benchmark related code
from sui-indexer crate.
After this change, we will always first run synthetic ingestion to
generate a workload, and then separately, run indexer to benchmark it.
This has a few benefits:
1. We no longer need to maintain compatibility between indexer and
indexer-alt in terms of benchmark integration. This is good because it
is a lot easier to benchmark indexer-alt, since it supports stopping at
a specific checkpoint.
2. This will make it easier to use different types of ingestion
workload.

Also cleaned up the ingestion generation code by allowing generation
from checkpoint 0, and made sure that every checkpoint has the same
number of transactions.

## Test plan 

Updated tests.
---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
lxfind authored Nov 20, 2024
1 parent 1f08986 commit 0241db6
Show file tree
Hide file tree
Showing 25 changed files with 143 additions and 643 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions crates/simulacrum/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,10 +391,10 @@ impl<R, S: store::SimulatorStore> Simulacrum<R, S> {
.unwrap();
}

pub fn override_last_checkpoint_number(&mut self, number: CheckpointSequenceNumber) {
pub fn override_next_checkpoint_number(&mut self, number: CheckpointSequenceNumber) {
let committee = CommitteeWithKeys::new(&self.keystore, self.epoch_state.committee());
self.checkpoint_builder
.override_last_checkpoint_number(number, &committee);
.override_next_checkpoint_number(number, &committee);
}

fn process_data_ingestion(
Expand Down
131 changes: 0 additions & 131 deletions crates/sui-indexer/src/benchmark.rs

This file was deleted.

1 change: 0 additions & 1 deletion crates/sui-indexer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,6 @@ pub enum Command {
},
/// Restore the database from formal snaphots.
Restore(RestoreConfig),
Benchmark(BenchmarkConfig),
}

#[derive(Args, Default, Debug, Clone)]
Expand Down
4 changes: 0 additions & 4 deletions crates/sui-indexer/src/handlers/checkpoint_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use move_core_types::language_storage::{StructTag, TypeTag};
use mysten_metrics::{get_metrics, spawn_monitored_task};
use sui_data_ingestion_core::Worker;
use sui_rest_api::{CheckpointData, CheckpointTransaction};
use sui_synthetic_ingestion::IndexerProgress;
use sui_types::dynamic_field::DynamicFieldType;
use sui_types::effects::{ObjectChange, TransactionEffectsAPI};
use sui_types::event::SystemEpochInfoEvent;
Expand All @@ -25,7 +24,6 @@ use sui_types::object::Object;
use sui_types::object::Owner;
use sui_types::sui_system_state::{get_sui_system_state, SuiSystemStateTrait};
use sui_types::transaction::TransactionDataAPI;
use tokio::sync::watch;

use crate::errors::IndexerError;
use crate::handlers::committer::start_tx_checkpoint_commit_task;
Expand All @@ -51,7 +49,6 @@ pub async fn new_handlers(
state: PgIndexerStore,
metrics: IndexerMetrics,
cancel: CancellationToken,
committed_checkpoints_tx: Option<watch::Sender<Option<IndexerProgress>>>,
start_checkpoint_opt: Option<CheckpointSequenceNumber>,
end_checkpoint_opt: Option<CheckpointSequenceNumber>,
mvr_mode: bool,
Expand Down Expand Up @@ -85,7 +82,6 @@ pub async fn new_handlers(
metrics_clone,
indexed_checkpoint_receiver,
cancel.clone(),
committed_checkpoints_tx,
start_checkpoint,
end_checkpoint_opt,
mvr_mode
Expand Down
37 changes: 2 additions & 35 deletions crates/sui-indexer/src/handlers/committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@

use std::collections::{BTreeMap, HashMap};

use sui_synthetic_ingestion::IndexerProgress;
use sui_types::messages_checkpoint::CheckpointSequenceNumber;
use tap::tap::TapFallible;
use tokio::sync::watch;
use tokio_util::sync::CancellationToken;
use tracing::instrument;
use tracing::{error, info};
Expand All @@ -25,7 +23,6 @@ pub async fn start_tx_checkpoint_commit_task<S>(
metrics: IndexerMetrics,
tx_indexing_receiver: mysten_metrics::metered_channel::Receiver<CheckpointDataToCommit>,
cancel: CancellationToken,
mut committed_checkpoints_tx: Option<watch::Sender<Option<IndexerProgress>>>,
mut next_checkpoint_sequence_number: CheckpointSequenceNumber,
end_checkpoint_opt: Option<CheckpointSequenceNumber>,
mvr_mode: bool,
Expand Down Expand Up @@ -65,15 +62,7 @@ where
// The batch will consist of contiguous checkpoints and at most one epoch boundary at
// the end.
if batch.len() == checkpoint_commit_batch_size || epoch.is_some() {
commit_checkpoints(
&state,
batch,
epoch,
&metrics,
&mut committed_checkpoints_tx,
mvr_mode,
)
.await;
commit_checkpoints(&state, batch, epoch, &metrics, mvr_mode).await;
batch = vec![];
}
if let Some(epoch_number) = epoch_number_option {
Expand All @@ -93,15 +82,7 @@ where
}
}
if !batch.is_empty() {
commit_checkpoints(
&state,
batch,
None,
&metrics,
&mut committed_checkpoints_tx,
mvr_mode,
)
.await;
commit_checkpoints(&state, batch, None, &metrics, mvr_mode).await;
batch = vec![];
}

Expand Down Expand Up @@ -129,7 +110,6 @@ async fn commit_checkpoints<S>(
indexed_checkpoint_batch: Vec<CheckpointDataToCommit>,
epoch: Option<EpochToCommit>,
metrics: &IndexerMetrics,
committed_checkpoints_tx: &mut Option<watch::Sender<Option<IndexerProgress>>>,
mvr_mode: bool,
) where
S: IndexerStore + Clone + Sync + Send + 'static,
Expand Down Expand Up @@ -176,10 +156,6 @@ async fn commit_checkpoints<S>(

let first_checkpoint_seq = checkpoint_batch.first().unwrap().sequence_number;
let last_checkpoint = checkpoint_batch.last().unwrap();
let indexer_progress = IndexerProgress {
checkpoint: last_checkpoint.sequence_number,
network_total_transactions: last_checkpoint.network_total_transactions,
};
let committer_watermark = CommitterWatermark::from(last_checkpoint);

let guard = metrics.checkpoint_db_commit_latency.start_timer();
Expand Down Expand Up @@ -311,13 +287,4 @@ async fn commit_checkpoints<S>(
metrics
.thousand_transaction_avg_db_commit_latency
.observe(elapsed * 1000.0 / tx_count as f64);

if let Some(committed_checkpoints_tx) = committed_checkpoints_tx.as_mut() {
if let Err(err) = committed_checkpoints_tx.send(Some(indexer_progress)) {
error!(
"Failed to send committed checkpoints to the watch channel: {}",
err
);
}
}
}
5 changes: 1 addition & 4 deletions crates/sui-indexer/src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::env;

use anyhow::Result;
use prometheus::Registry;
use tokio::sync::{oneshot, watch};
use tokio::sync::oneshot;
use tokio_util::sync::CancellationToken;
use tracing::{info, warn};

Expand All @@ -16,7 +16,6 @@ use mysten_metrics::spawn_monitored_task;
use sui_data_ingestion_core::{
DataIngestionMetrics, IndexerExecutor, ProgressStore, ReaderOptions, WorkerPool,
};
use sui_synthetic_ingestion::IndexerProgress;
use sui_types::messages_checkpoint::CheckpointSequenceNumber;

use crate::build_json_rpc_server;
Expand All @@ -40,7 +39,6 @@ impl Indexer {
snapshot_config: SnapshotLagConfig,
mut retention_config: Option<RetentionConfig>,
cancel: CancellationToken,
committed_checkpoints_tx: Option<watch::Sender<Option<IndexerProgress>>>,
mvr_mode: bool,
) -> Result<(), IndexerError> {
info!(
Expand Down Expand Up @@ -99,7 +97,6 @@ impl Indexer {
store,
metrics,
cancel.clone(),
committed_checkpoints_tx,
config.start_checkpoint,
config.end_checkpoint,
mvr_mode,
Expand Down
1 change: 0 additions & 1 deletion crates/sui-indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use errors::IndexerError;

pub mod apis;
pub mod backfill;
pub mod benchmark;
pub mod config;
pub mod database;
pub mod db;
Expand Down
5 changes: 0 additions & 5 deletions crates/sui-indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

use clap::Parser;
use sui_indexer::backfill::backfill_runner::BackfillRunner;
use sui_indexer::benchmark::run_indexer_benchmark;
use sui_indexer::config::{Command, RetentionConfig, UploadOptions};
use sui_indexer::database::ConnectionPool;
use sui_indexer::db::setup_postgres::clear_database;
Expand Down Expand Up @@ -73,7 +72,6 @@ async fn main() -> anyhow::Result<()> {
snapshot_config,
retention_config,
CancellationToken::new(),
None,
mvr_mode,
)
.await?;
Expand Down Expand Up @@ -119,9 +117,6 @@ async fn main() -> anyhow::Result<()> {
IndexerFormalSnapshotRestorer::new(store, restore_config).await?;
formal_restorer.restore().await?;
}
Command::Benchmark(benchmark_config) => {
run_indexer_benchmark(benchmark_config, pool, indexer_metrics).await;
}
}

Ok(())
Expand Down
1 change: 0 additions & 1 deletion crates/sui-indexer/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,6 @@ pub async fn start_indexer_writer_for_testing_with_mvr_mode(
snapshot_config,
retention_config,
token_clone,
None,
mvr_mode,
)
.await
Expand Down
Loading

0 comments on commit 0241db6

Please sign in to comment.