diff --git a/Cargo.lock b/Cargo.lock index 415bfa98597a4b..06296f82053969 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13719,6 +13719,7 @@ dependencies = [ "sui-sdk", "sui-snapshot", "sui-storage", + "sui-synthetic-ingestion", "sui-test-transaction-builder", "sui-transaction-builder", "sui-types", diff --git a/crates/sui-data-ingestion-core/src/worker_pool.rs b/crates/sui-data-ingestion-core/src/worker_pool.rs index ff4b38f47122fa..333e9bdaf41264 100644 --- a/crates/sui-data-ingestion-core/src/worker_pool.rs +++ b/crates/sui-data-ingestion-core/src/worker_pool.rs @@ -160,7 +160,7 @@ impl WorkerPool { // Wait for all workers to finish for join_handle in join_handles { - join_handle.await.expect("worker thread panicked"); + let _ = join_handle.await; } } } diff --git a/crates/sui-indexer/Cargo.toml b/crates/sui-indexer/Cargo.toml index fa4490741b1635..3470488d510f1f 100644 --- a/crates/sui-indexer/Cargo.toml +++ b/crates/sui-indexer/Cargo.toml @@ -8,7 +8,7 @@ edition = "2021" [dependencies] anyhow.workspace = true -rand = "0.8.5" +rand = "0.8.5" async-trait.workspace = true axum.workspace = true backoff.workspace = true @@ -64,6 +64,7 @@ sui-protocol-config.workspace = true telemetry-subscribers.workspace = true sui-rest-api.workspace = true sui-transaction-builder.workspace = true +sui-synthetic-ingestion.workspace = true move-core-types.workspace = true move-bytecode-utils.workspace = true diff --git a/crates/sui-indexer/src/benchmark.rs b/crates/sui-indexer/src/benchmark.rs new file mode 100644 index 00000000000000..dea0b4d00b1fbe --- /dev/null +++ b/crates/sui-indexer/src/benchmark.rs @@ -0,0 +1,115 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use crate::config::{BenchmarkConfig, IngestionConfig, IngestionSources, UploadOptions}; +use crate::database::ConnectionPool; +use crate::db::{reset_database, run_migrations}; +use crate::errors::IndexerError; +use crate::indexer::Indexer; +use crate::metrics::IndexerMetrics; +use crate::store::PgIndexerStore; +use std::path::PathBuf; +use sui_synthetic_ingestion::benchmark::{run_benchmark, BenchmarkableIndexer}; +use sui_synthetic_ingestion::{IndexerProgress, SyntheticIngestionConfig}; +use tokio::sync::watch; +use tokio::task::JoinHandle; +use tokio_util::sync::CancellationToken; + +pub async fn run_indexer_benchmark( + config: BenchmarkConfig, + pool: ConnectionPool, + metrics: IndexerMetrics, +) { + if config.reset_db { + reset_database(pool.dedicated_connection().await.unwrap()) + .await + .unwrap(); + } else { + run_migrations(pool.dedicated_connection().await.unwrap()) + .await + .unwrap(); + } + let store = PgIndexerStore::new(pool, UploadOptions::default(), metrics.clone()); + let ingestion_dir = tempfile::tempdir().unwrap().into_path(); + let synthetic_ingestion_config = SyntheticIngestionConfig { + ingestion_dir: ingestion_dir.clone(), + checkpoint_size: config.checkpoint_size, + num_checkpoints: config.num_checkpoints, + }; + let indexer = BenchmarkIndexer::new(store, metrics, ingestion_dir); + run_benchmark(synthetic_ingestion_config, indexer).await; +} + +pub struct BenchmarkIndexer { + inner: Option, + cancel: CancellationToken, + committed_checkpoints_rx: watch::Receiver>, + handle: Option>>, +} + +struct BenchmarkIndexerInner { + ingestion_dir: PathBuf, + store: PgIndexerStore, + metrics: IndexerMetrics, + committed_checkpoints_tx: watch::Sender>, +} + +impl BenchmarkIndexer { + pub fn new(store: PgIndexerStore, metrics: IndexerMetrics, ingestion_dir: PathBuf) -> Self { + let cancel = CancellationToken::new(); + let (committed_checkpoints_tx, committed_checkpoints_rx) = watch::channel(None); + Self { + inner: Some(BenchmarkIndexerInner { + ingestion_dir, + store, + metrics, + committed_checkpoints_tx, + }), + cancel, + committed_checkpoints_rx, + handle: None, + } + } +} + +#[async_trait::async_trait] +impl BenchmarkableIndexer for BenchmarkIndexer { + fn subscribe_to_committed_checkpoints(&self) -> watch::Receiver> { + self.committed_checkpoints_rx.clone() + } + + async fn start(&mut self) { + let BenchmarkIndexerInner { + ingestion_dir, + store, + metrics, + committed_checkpoints_tx, + } = self.inner.take().unwrap(); + let ingestion_config = IngestionConfig { + sources: IngestionSources { + data_ingestion_path: Some(ingestion_dir), + ..Default::default() + }, + ..Default::default() + }; + let cancel = self.cancel.clone(); + let handle = tokio::task::spawn(async move { + Indexer::start_writer( + ingestion_config, + store, + metrics, + Default::default(), + None, + cancel, + Some(committed_checkpoints_tx), + ) + .await + }); + self.handle = Some(handle); + } + + async fn stop(mut self) { + self.cancel.cancel(); + self.handle.unwrap().await.unwrap().unwrap(); + } +} diff --git a/crates/sui-indexer/src/config.rs b/crates/sui-indexer/src/config.rs index f51d18ab1ff884..b19749282910f4 100644 --- a/crates/sui-indexer/src/config.rs +++ b/crates/sui-indexer/src/config.rs @@ -210,6 +210,7 @@ pub enum Command { }, /// Restore the database from formal snaphots. Restore(RestoreConfig), + Benchmark(BenchmarkConfig), } #[derive(Args, Default, Debug, Clone)] @@ -378,6 +379,28 @@ impl Default for RestoreConfig { } } +#[derive(Args, Debug, Clone)] +pub struct BenchmarkConfig { + #[arg( + long, + default_value_t = 200, + help = "Number of transactions in a checkpoint." + )] + pub checkpoint_size: u64, + #[arg( + long, + default_value_t = 2000, + help = "Total number of synthetic checkpoints to generate." + )] + pub num_checkpoints: u64, + #[arg( + long, + default_value_t = false, + help = "Whether to reset the database before running." + )] + pub reset_db: bool, +} + #[cfg(test)] mod test { use super::*; diff --git a/crates/sui-indexer/src/handlers/checkpoint_handler.rs b/crates/sui-indexer/src/handlers/checkpoint_handler.rs index be4e0d375a923e..3d045840a87642 100644 --- a/crates/sui-indexer/src/handlers/checkpoint_handler.rs +++ b/crates/sui-indexer/src/handlers/checkpoint_handler.rs @@ -14,6 +14,7 @@ use move_core_types::language_storage::{StructTag, TypeTag}; use mysten_metrics::{get_metrics, spawn_monitored_task}; use sui_data_ingestion_core::Worker; use sui_rest_api::{CheckpointData, CheckpointTransaction}; +use sui_synthetic_ingestion::IndexerProgress; use sui_types::dynamic_field::DynamicFieldType; use sui_types::effects::{ObjectChange, TransactionEffectsAPI}; use sui_types::event::SystemEpochInfoEvent; @@ -24,6 +25,7 @@ use sui_types::object::Object; use sui_types::object::Owner; use sui_types::sui_system_state::{get_sui_system_state, SuiSystemStateTrait}; use sui_types::transaction::TransactionDataAPI; +use tokio::sync::watch; use crate::errors::IndexerError; use crate::handlers::committer::start_tx_checkpoint_commit_task; @@ -50,6 +52,7 @@ pub async fn new_handlers( metrics: IndexerMetrics, next_checkpoint_sequence_number: CheckpointSequenceNumber, cancel: CancellationToken, + committed_checkpoints_tx: Option>>, ) -> Result { let checkpoint_queue_size = std::env::var("CHECKPOINT_QUEUE_SIZE") .unwrap_or(CHECKPOINT_QUEUE_SIZE.to_string()) @@ -71,7 +74,8 @@ pub async fn new_handlers( metrics_clone, indexed_checkpoint_receiver, next_checkpoint_sequence_number, - cancel.clone() + cancel.clone(), + committed_checkpoints_tx )); Ok(CheckpointHandler::new( state, diff --git a/crates/sui-indexer/src/handlers/committer.rs b/crates/sui-indexer/src/handlers/committer.rs index ad9df09be48941..6c08fda947667b 100644 --- a/crates/sui-indexer/src/handlers/committer.rs +++ b/crates/sui-indexer/src/handlers/committer.rs @@ -3,14 +3,16 @@ use std::collections::{BTreeMap, HashMap}; +use sui_synthetic_ingestion::IndexerProgress; +use sui_types::messages_checkpoint::CheckpointSequenceNumber; use tap::tap::TapFallible; +use tokio::sync::watch; use tokio_util::sync::CancellationToken; use tracing::instrument; use tracing::{error, info}; -use sui_types::messages_checkpoint::CheckpointSequenceNumber; - use crate::metrics::IndexerMetrics; +use crate::models::raw_checkpoints::StoredRawCheckpoint; use crate::store::IndexerStore; use crate::types::IndexerResult; @@ -24,6 +26,7 @@ pub async fn start_tx_checkpoint_commit_task( tx_indexing_receiver: mysten_metrics::metered_channel::Receiver, mut next_checkpoint_sequence_number: CheckpointSequenceNumber, cancel: CancellationToken, + mut committed_checkpoints_tx: Option>>, ) -> IndexerResult<()> where S: IndexerStore + Clone + Sync + Send + 'static, @@ -60,7 +63,14 @@ where // The batch will consist of contiguous checkpoints and at most one epoch boundary at // the end. if batch.len() == checkpoint_commit_batch_size || epoch.is_some() { - commit_checkpoints(&state, batch, epoch, &metrics).await; + commit_checkpoints( + &state, + batch, + epoch, + &metrics, + &mut committed_checkpoints_tx, + ) + .await; batch = vec![]; } if let Some(epoch_number) = epoch_number_option { @@ -74,7 +84,7 @@ where } } if !batch.is_empty() { - commit_checkpoints(&state, batch, None, &metrics).await; + commit_checkpoints(&state, batch, None, &metrics, &mut committed_checkpoints_tx).await; batch = vec![]; } } @@ -95,6 +105,7 @@ async fn commit_checkpoints( indexed_checkpoint_batch: Vec, epoch: Option, metrics: &IndexerMetrics, + committed_checkpoints_tx: &mut Option>>, ) where S: IndexerStore + Clone + Sync + Send + 'static, { @@ -135,8 +146,13 @@ async fn commit_checkpoints( packages_batch.push(packages); } - let first_checkpoint_seq = checkpoint_batch.first().as_ref().unwrap().sequence_number; - let committer_watermark = CommitterWatermark::from(checkpoint_batch.last().unwrap()); + let first_checkpoint_seq = checkpoint_batch.first().unwrap().sequence_number; + let last_checkpoint = checkpoint_batch.last().unwrap(); + let indexer_progress = IndexerProgress { + checkpoint: last_checkpoint.sequence_number, + network_total_transactions: last_checkpoint.network_total_transactions, + }; + let committer_watermark = CommitterWatermark::from(last_checkpoint); let guard = metrics.checkpoint_db_commit_latency.start_timer(); let tx_batch = tx_batch.into_iter().flatten().collect::>(); @@ -156,7 +172,7 @@ async fn commit_checkpoints( let raw_checkpoints_batch = checkpoint_batch .iter() .map(|c| c.into()) - .collect::>(); + .collect::>(); { let _step_1_guard = metrics.checkpoint_db_commit_latency_step_1.start_timer(); @@ -266,4 +282,13 @@ async fn commit_checkpoints( metrics .thousand_transaction_avg_db_commit_latency .observe(elapsed * 1000.0 / tx_count as f64); + + if let Some(committed_checkpoints_tx) = committed_checkpoints_tx.as_mut() { + if let Err(err) = committed_checkpoints_tx.send(Some(indexer_progress)) { + error!( + "Failed to send committed checkpoints to the watch channel: {}", + err + ); + } + } } diff --git a/crates/sui-indexer/src/indexer.rs b/crates/sui-indexer/src/indexer.rs index 240e2951790942..987382fbd3f5d6 100644 --- a/crates/sui-indexer/src/indexer.rs +++ b/crates/sui-indexer/src/indexer.rs @@ -6,7 +6,7 @@ use std::env; use anyhow::Result; use prometheus::Registry; -use tokio::sync::oneshot; +use tokio::sync::{oneshot, watch}; use tokio_util::sync::CancellationToken; use tracing::info; @@ -16,6 +16,7 @@ use mysten_metrics::spawn_monitored_task; use sui_data_ingestion_core::{ DataIngestionMetrics, IndexerExecutor, ProgressStore, ReaderOptions, WorkerPool, }; +use sui_synthetic_ingestion::IndexerProgress; use sui_types::messages_checkpoint::CheckpointSequenceNumber; use crate::build_json_rpc_server; @@ -33,12 +34,13 @@ pub struct Indexer; impl Indexer { pub async fn start_writer( - config: &IngestionConfig, + config: IngestionConfig, store: PgIndexerStore, metrics: IndexerMetrics, snapshot_config: SnapshotLagConfig, retention_config: Option, cancel: CancellationToken, + committed_checkpoints_tx: Option>>, ) -> Result<(), IndexerError> { info!( "Sui Indexer Writer (version {:?}) started...", @@ -98,7 +100,14 @@ impl Indexer { 2, DataIngestionMetrics::new(&Registry::new()), ); - let worker = new_handlers(store, metrics, primary_watermark, cancel.clone()).await?; + let worker = new_handlers( + store, + metrics, + primary_watermark, + cancel.clone(), + committed_checkpoints_tx, + ) + .await?; let worker_pool = WorkerPool::new( worker, "primary".to_string(), diff --git a/crates/sui-indexer/src/lib.rs b/crates/sui-indexer/src/lib.rs index e759370c727985..f40b0fdfcfb8a5 100644 --- a/crates/sui-indexer/src/lib.rs +++ b/crates/sui-indexer/src/lib.rs @@ -27,6 +27,7 @@ use errors::IndexerError; pub mod apis; pub mod backfill; +pub mod benchmark; pub mod config; pub mod database; pub mod db; diff --git a/crates/sui-indexer/src/main.rs b/crates/sui-indexer/src/main.rs index 8978d072d8dead..d5281b3dffd6d4 100644 --- a/crates/sui-indexer/src/main.rs +++ b/crates/sui-indexer/src/main.rs @@ -3,6 +3,7 @@ use clap::Parser; use sui_indexer::backfill::backfill_runner::BackfillRunner; +use sui_indexer::benchmark::run_indexer_benchmark; use sui_indexer::config::{Command, UploadOptions}; use sui_indexer::database::ConnectionPool; use sui_indexer::db::{ @@ -55,12 +56,13 @@ async fn main() -> anyhow::Result<()> { let store = PgIndexerStore::new(pool, upload_options, indexer_metrics.clone()); Indexer::start_writer( - &ingestion_config, + ingestion_config, store, indexer_metrics, snapshot_config, retention_config, CancellationToken::new(), + None, ) .await?; } @@ -98,6 +100,9 @@ async fn main() -> anyhow::Result<()> { IndexerFormalSnapshotRestorer::new(store, restore_config).await?; formal_restorer.restore().await?; } + Command::Benchmark(benchmark_config) => { + run_indexer_benchmark(benchmark_config, pool, indexer_metrics).await; + } } Ok(()) diff --git a/crates/sui-indexer/src/test_utils.rs b/crates/sui-indexer/src/test_utils.rs index 6a208f8e4c6db5..40ad5cab4f4f00 100644 --- a/crates/sui-indexer/src/test_utils.rs +++ b/crates/sui-indexer/src/test_utils.rs @@ -123,12 +123,13 @@ pub async fn start_indexer_writer_for_testing( tokio::spawn(async move { Indexer::start_writer( - &ingestion_config, + ingestion_config, store_clone, indexer_metrics, snapshot_config, retention_config, token_clone, + None, ) .await })