From 155a4cae6308b07a69e1d528ac114addd562552c Mon Sep 17 00:00:00 2001 From: Xun Li Date: Wed, 16 Oct 2024 14:16:48 -0700 Subject: [PATCH 1/2] Add synthetic ingestion and benchmark framework --- Cargo.lock | 13 ++ Cargo.toml | 2 + crates/simulacrum/src/lib.rs | 6 + crates/sui-synthetic-ingestion/Cargo.toml | 18 +++ .../sui-synthetic-ingestion/src/benchmark.rs | 150 ++++++++++++++++++ crates/sui-synthetic-ingestion/src/lib.rs | 29 ++++ .../src/synthetic_ingestion.rs | 56 +++++++ .../src/tps_tracker.rs | 80 ++++++++++ .../sui-types/src/mock_checkpoint_builder.rs | 11 ++ 9 files changed, 365 insertions(+) create mode 100644 crates/sui-synthetic-ingestion/Cargo.toml create mode 100644 crates/sui-synthetic-ingestion/src/benchmark.rs create mode 100644 crates/sui-synthetic-ingestion/src/lib.rs create mode 100644 crates/sui-synthetic-ingestion/src/synthetic_ingestion.rs create mode 100644 crates/sui-synthetic-ingestion/src/tps_tracker.rs diff --git a/Cargo.lock b/Cargo.lock index 30614cbf7a27b..682170cccf01b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -14974,6 +14974,19 @@ dependencies = [ "tracing", ] +[[package]] +name = "sui-synthetic-ingestion" +version = "0.0.0" +dependencies = [ + "async-trait", + "simulacrum", + "sui-test-transaction-builder", + "sui-types", + "tempfile", + "tokio", + "tracing", +] + [[package]] name = "sui-telemetry" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index a03184519f89e..e1ef97b2270b5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -154,6 +154,7 @@ members = [ "crates/sui-surfer", "crates/sui-swarm", "crates/sui-swarm-config", + "crates/sui-synthetic-ingestion", "crates/sui-telemetry", "crates/sui-test-transaction-builder", "crates/sui-test-validator", @@ -669,6 +670,7 @@ sui-storage = { path = "crates/sui-storage" } sui-surfer = { path = "crates/sui-surfer" } sui-swarm = { path = "crates/sui-swarm" } sui-swarm-config = { path = "crates/sui-swarm-config" } +sui-synthetic-ingestion = { path = "crates/sui-synthetic-ingestion" } sui-telemetry = { path = "crates/sui-telemetry" } sui-test-transaction-builder = { path = "crates/sui-test-transaction-builder" } sui-test-validator = { path = "crates/sui-test-validator" } diff --git a/crates/simulacrum/src/lib.rs b/crates/simulacrum/src/lib.rs index 32ffab8beaca6..9e8024ac8469c 100644 --- a/crates/simulacrum/src/lib.rs +++ b/crates/simulacrum/src/lib.rs @@ -391,6 +391,12 @@ impl Simulacrum { .unwrap(); } + pub fn override_last_checkpoint_number(&mut self, number: CheckpointSequenceNumber) { + let committee = CommitteeWithKeys::new(&self.keystore, self.epoch_state.committee()); + self.checkpoint_builder + .override_last_checkpoint_number(number, &committee); + } + fn process_data_ingestion( &self, checkpoint: VerifiedCheckpoint, diff --git a/crates/sui-synthetic-ingestion/Cargo.toml b/crates/sui-synthetic-ingestion/Cargo.toml new file mode 100644 index 0000000000000..07391de552efe --- /dev/null +++ b/crates/sui-synthetic-ingestion/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "sui-synthetic-ingestion" +version = "0.0.0" +authors = ["Mysten Labs "] +license = "Apache-2.0" +publish = false +edition = "2021" + +[dependencies] +async-trait.workspace = true +simulacrum.workspace = true +sui-test-transaction-builder.workspace = true +sui-types = { workspace = true, features = ["test-utils"] } +tokio.workspace = true +tracing.workspace = true + +[dev-dependencies] +tempfile.workspace = true diff --git a/crates/sui-synthetic-ingestion/src/benchmark.rs b/crates/sui-synthetic-ingestion/src/benchmark.rs new file mode 100644 index 0000000000000..10c01d0053a4a --- /dev/null +++ b/crates/sui-synthetic-ingestion/src/benchmark.rs @@ -0,0 +1,150 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use crate::synthetic_ingestion::generate_ingestion; +use crate::tps_tracker::TpsTracker; +use crate::{IndexerProgress, SyntheticIngestionConfig}; +use std::time::Duration; +use tokio::sync::watch; +use tracing::{error, info}; + +/// A trait that can be implemented on top of any indexer to benchmark its throughput. +/// It will generate synthetic transactions and checkpoints as ingestion source. +#[async_trait::async_trait] +pub trait BenchmarkableIndexer { + /// Allows the benchmark to subscribe and monitor the committed checkpoints progress. + /// This is needed both in order to log periodic throughput, but also + /// to know when the benchmark can stop. + fn subscribe_to_committed_checkpoints(&self) -> watch::Receiver>; + /// Start the indexer. Note that we only start a timer before calling this function. + /// So the implementation should only start the indexer when this function is called. + async fn start(&mut self); + /// Stop the indexer. This would allow the benchmark to exit. + async fn stop(mut self); +} + +pub async fn run_benchmark( + config: SyntheticIngestionConfig, + mut indexer: I, +) -> u64 { + assert!( + config.starting_checkpoint > 0, + "Checkpoint 0 is reserved for genesis checkpoint" + ); + let expected_last_checkpoint = config.starting_checkpoint + config.num_checkpoints - 1; + generate_ingestion(config.clone()); + + let mut rx = indexer.subscribe_to_committed_checkpoints(); + let mut tps_tracker = TpsTracker::new(Duration::from_secs(1)); + info!("Starting benchmark..."); + indexer.start().await; + + loop { + if let Err(err) = rx.changed().await { + error!("Error polling from watch channel, exiting early: {:?}", err); + break; + } + let committed_checkpoint = rx.borrow_and_update().clone(); + if let Some(checkpoint) = committed_checkpoint { + tps_tracker.update(checkpoint.clone()); + if checkpoint.checkpoint == expected_last_checkpoint { + break; + } + } + } + let seq = tps_tracker.finish(); + indexer.stop().await; + seq +} + +#[cfg(test)] +mod test { + use crate::benchmark::{run_benchmark, BenchmarkableIndexer}; + use crate::{IndexerProgress, SyntheticIngestionConfig}; + use std::path::PathBuf; + use std::time::Duration; + use sui_types::messages_checkpoint::CheckpointSequenceNumber; + use tokio::sync::watch; + + struct MockIndexer { + starting_checkpoint: CheckpointSequenceNumber, + ingestion_dir: PathBuf, + committed_checkpoint_tx: Option>>, + committed_checkpoint_rx: watch::Receiver>, + } + + impl MockIndexer { + fn new(starting_checkpoint: CheckpointSequenceNumber, ingestion_dir: PathBuf) -> Self { + let (committed_checkpoint_tx, committed_checkpoint_rx) = watch::channel(None); + Self { + starting_checkpoint, + ingestion_dir, + committed_checkpoint_tx: Some(committed_checkpoint_tx), + committed_checkpoint_rx, + } + } + } + + #[async_trait::async_trait] + impl BenchmarkableIndexer for MockIndexer { + fn subscribe_to_committed_checkpoints(&self) -> watch::Receiver> { + self.committed_checkpoint_rx.clone() + } + + async fn start(&mut self) { + let tx = self.committed_checkpoint_tx.take().unwrap(); + let mut checkpoint = self.starting_checkpoint; + let dir = self.ingestion_dir.clone(); + tokio::task::spawn(async move { + loop { + tokio::time::sleep(Duration::from_millis(100)).await; + let path = dir.join(format!("{}.chk", checkpoint)); + if std::fs::metadata(&path).is_err() { + break; + } + tx.send(Some(IndexerProgress { + checkpoint, + network_total_transactions: 0, + })) + .unwrap(); + checkpoint += 1; + } + }); + } + + async fn stop(mut self) {} + } + + #[tokio::test] + async fn test_run_ingestion_benchmark() { + let tmp_dir = tempfile::tempdir().unwrap(); + let config = SyntheticIngestionConfig { + ingestion_dir: tmp_dir.path().to_path_buf(), + checkpoint_size: 10, + num_checkpoints: 10, + starting_checkpoint: 1, + }; + let indexer = MockIndexer::new(config.starting_checkpoint, tmp_dir.path().to_path_buf()); + let last_checkpoint = + tokio::time::timeout(Duration::from_secs(10), run_benchmark(config, indexer)) + .await + .unwrap(); + assert_eq!(last_checkpoint, 10); + } + #[tokio::test] + async fn test_run_ingestion_benchmark_custom_starting_checkpoint() { + let tmp_dir = tempfile::tempdir().unwrap(); + let config = SyntheticIngestionConfig { + ingestion_dir: tmp_dir.path().to_path_buf(), + checkpoint_size: 10, + num_checkpoints: 10, + starting_checkpoint: 1000, + }; + let indexer = MockIndexer::new(config.starting_checkpoint, tmp_dir.path().to_path_buf()); + let last_checkpoint = + tokio::time::timeout(Duration::from_secs(10), run_benchmark(config, indexer)) + .await + .unwrap(); + assert_eq!(last_checkpoint, 1009); + } +} diff --git a/crates/sui-synthetic-ingestion/src/lib.rs b/crates/sui-synthetic-ingestion/src/lib.rs new file mode 100644 index 0000000000000..d7f52b8d34b3a --- /dev/null +++ b/crates/sui-synthetic-ingestion/src/lib.rs @@ -0,0 +1,29 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use std::path::PathBuf; +use sui_types::messages_checkpoint::CheckpointSequenceNumber; + +pub mod benchmark; +mod synthetic_ingestion; +mod tps_tracker; + +#[derive(Clone, Debug)] +pub struct SyntheticIngestionConfig { + /// Directory to write the ingestion data to. + pub ingestion_dir: PathBuf, + /// Number of transactions in a checkpoint. + pub checkpoint_size: u64, + /// Total number of synthetic checkpoints to generate. + pub num_checkpoints: u64, + /// Customize the first checkpoint sequence number to be committed. + /// This is useful if we want to benchmark on a non-empty database. + /// Note that this must be > 0, because the genesis checkpoint is always 0. + pub starting_checkpoint: CheckpointSequenceNumber, +} + +#[derive(Clone, Debug)] +pub struct IndexerProgress { + pub checkpoint: CheckpointSequenceNumber, + pub network_total_transactions: u64, +} diff --git a/crates/sui-synthetic-ingestion/src/synthetic_ingestion.rs b/crates/sui-synthetic-ingestion/src/synthetic_ingestion.rs new file mode 100644 index 0000000000000..1ce4848506a84 --- /dev/null +++ b/crates/sui-synthetic-ingestion/src/synthetic_ingestion.rs @@ -0,0 +1,56 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use crate::SyntheticIngestionConfig; +use simulacrum::Simulacrum; +use sui_test_transaction_builder::TestTransactionBuilder; +use sui_types::crypto::get_account_key_pair; +use sui_types::effects::TransactionEffectsAPI; +use sui_types::gas_coin::MIST_PER_SUI; +use sui_types::utils::to_sender_signed_transaction; +use tracing::info; + +// TODO: Simulacrum does serial execution which could be slow if +// we need to generate a large number of transactions. +// We may want to make Simulacrum support parallel execution. + +pub(crate) fn generate_ingestion(config: SyntheticIngestionConfig) { + info!("Generating synthetic ingestion data. config: {:?}", config); + let timer = std::time::Instant::now(); + let mut sim = Simulacrum::new(); + let SyntheticIngestionConfig { + ingestion_dir, + checkpoint_size, + num_checkpoints, + starting_checkpoint, + } = config; + sim.set_data_ingestion_path(ingestion_dir); + sim.override_last_checkpoint_number(starting_checkpoint - 1); + + let gas_price = sim.reference_gas_price(); + let (sender, keypair) = get_account_key_pair(); + let effects = sim.request_gas(sender, MIST_PER_SUI * 1000000).unwrap(); + let mut gas_object = effects.created()[0].0; + let mut tx_count = 0; + for i in 0..num_checkpoints { + for _ in 0..checkpoint_size { + let tx_data = TestTransactionBuilder::new(sender, gas_object, gas_price) + .transfer_sui(Some(1), sender) + .build(); + let tx = to_sender_signed_transaction(tx_data, &keypair); + let (effects, _) = sim.execute_transaction(tx).unwrap(); + gas_object = effects.gas_object().0; + tx_count += 1; + } + sim.create_checkpoint(); + if (i + 1) % 100 == 0 { + info!("Generated {} checkpoints, {} transactions", i + 1, tx_count); + } + } + info!( + "Generated {} transactions in {} checkpoints. Total time: {:?}", + tx_count, + num_checkpoints, + timer.elapsed() + ); +} diff --git a/crates/sui-synthetic-ingestion/src/tps_tracker.rs b/crates/sui-synthetic-ingestion/src/tps_tracker.rs new file mode 100644 index 0000000000000..481e92e6fc8b9 --- /dev/null +++ b/crates/sui-synthetic-ingestion/src/tps_tracker.rs @@ -0,0 +1,80 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use crate::IndexerProgress; +use std::time::{Duration, Instant}; +use sui_types::messages_checkpoint::CheckpointSequenceNumber; +use tracing::info; + +pub(crate) struct TpsTracker { + start_time: Instant, + starting_state: Option, + + prev_time: Instant, + prev_timed_state: Option, + + cur_state: Option, + + peak_tps: f64, + + /// Log time elapsed and TPS every log_frequency duration. + log_frequency: Duration, +} + +impl TpsTracker { + pub fn new(log_frequency: Duration) -> Self { + let start_time = Instant::now(); + Self { + start_time, + starting_state: None, + prev_time: start_time, + prev_timed_state: None, + cur_state: None, + peak_tps: 0.0, + log_frequency, + } + } + + pub fn update(&mut self, cur_state: IndexerProgress) { + self.cur_state = Some(cur_state.clone()); + let cur_time = Instant::now(); + let Some(prev_timed_state) = self.prev_timed_state.clone() else { + self.prev_time = cur_time; + self.prev_timed_state = Some(cur_state.clone()); + self.start_time = cur_time; + self.starting_state = Some(cur_state); + return; + }; + let elapsed = cur_time - self.prev_time; + if elapsed < self.log_frequency { + return; + } + let tps = (cur_state.network_total_transactions + - prev_timed_state.network_total_transactions) as f64 + / elapsed.as_secs_f64(); + let cps = + (cur_state.checkpoint - prev_timed_state.checkpoint) as f64 / elapsed.as_secs_f64(); + info!( + "Last processed checkpoint: {}, Current TPS: {:.2}, CPS: {:.2}", + cur_state.checkpoint, tps, cps + ); + self.peak_tps = self.peak_tps.max(tps); + self.prev_time = cur_time; + self.prev_timed_state = Some(cur_state); + } + + pub fn finish(&mut self) -> CheckpointSequenceNumber { + let elapsed = Instant::now() - self.start_time; + let cur_state = self.cur_state.clone().unwrap(); + let starting_state = self.starting_state.clone().unwrap(); + let tps = (cur_state.network_total_transactions - starting_state.network_total_transactions) + as f64 + / elapsed.as_secs_f64(); + let cps = (cur_state.checkpoint - starting_state.checkpoint) as f64 / elapsed.as_secs_f64(); + info!( + "Benchmark completed. Total time: {:?}, Average TPS: {:.2}, CPS: {:.2}. Peak TPS: {:.2}", + elapsed, tps, cps, self.peak_tps, + ); + cur_state.checkpoint + } +} diff --git a/crates/sui-types/src/mock_checkpoint_builder.rs b/crates/sui-types/src/mock_checkpoint_builder.rs index d97a186c46ce9..da086f770b53b 100644 --- a/crates/sui-types/src/mock_checkpoint_builder.rs +++ b/crates/sui-types/src/mock_checkpoint_builder.rs @@ -63,6 +63,17 @@ impl MockCheckpointBuilder { .push(VerifiedExecutionData::new(transaction, effects)) } + pub fn override_last_checkpoint_number( + &mut self, + checkpoint_number: u64, + validator_keys: &impl ValidatorKeypairProvider, + ) { + let mut summary = self.previous_checkpoint.data().clone(); + summary.sequence_number = checkpoint_number; + let checkpoint = Self::create_certified_checkpoint(validator_keys, summary); + self.previous_checkpoint = checkpoint; + } + /// Builds a checkpoint using internally buffered transactions. pub fn build( &mut self, From 52e42f02f753d8dcb1da8008b5222f8a03101f69 Mon Sep 17 00:00:00 2001 From: Xun Li Date: Wed, 16 Oct 2024 21:21:34 -0700 Subject: [PATCH 2/2] Integrate sui-indexer with benchmark --- Cargo.lock | 1 + crates/sui-indexer/Cargo.toml | 3 +- crates/sui-indexer/src/benchmark.rs | 116 ++++++++++++++++++ crates/sui-indexer/src/config.rs | 29 +++++ .../src/handlers/checkpoint_handler.rs | 6 +- crates/sui-indexer/src/handlers/committer.rs | 39 ++++-- crates/sui-indexer/src/indexer.rs | 15 ++- crates/sui-indexer/src/lib.rs | 1 + crates/sui-indexer/src/main.rs | 7 +- crates/sui-indexer/src/test_utils.rs | 3 +- 10 files changed, 206 insertions(+), 14 deletions(-) create mode 100644 crates/sui-indexer/src/benchmark.rs diff --git a/Cargo.lock b/Cargo.lock index 682170cccf01b..2224e4e75c745 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13719,6 +13719,7 @@ dependencies = [ "sui-sdk", "sui-snapshot", "sui-storage", + "sui-synthetic-ingestion", "sui-test-transaction-builder", "sui-transaction-builder", "sui-types", diff --git a/crates/sui-indexer/Cargo.toml b/crates/sui-indexer/Cargo.toml index fa4490741b163..3470488d510f1 100644 --- a/crates/sui-indexer/Cargo.toml +++ b/crates/sui-indexer/Cargo.toml @@ -8,7 +8,7 @@ edition = "2021" [dependencies] anyhow.workspace = true -rand = "0.8.5" +rand = "0.8.5" async-trait.workspace = true axum.workspace = true backoff.workspace = true @@ -64,6 +64,7 @@ sui-protocol-config.workspace = true telemetry-subscribers.workspace = true sui-rest-api.workspace = true sui-transaction-builder.workspace = true +sui-synthetic-ingestion.workspace = true move-core-types.workspace = true move-bytecode-utils.workspace = true diff --git a/crates/sui-indexer/src/benchmark.rs b/crates/sui-indexer/src/benchmark.rs new file mode 100644 index 0000000000000..eafddd716a86e --- /dev/null +++ b/crates/sui-indexer/src/benchmark.rs @@ -0,0 +1,116 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use crate::config::{BenchmarkConfig, IngestionConfig, IngestionSources, UploadOptions}; +use crate::database::ConnectionPool; +use crate::db::{reset_database, run_migrations}; +use crate::errors::IndexerError; +use crate::indexer::Indexer; +use crate::metrics::IndexerMetrics; +use crate::store::PgIndexerStore; +use std::path::PathBuf; +use sui_synthetic_ingestion::benchmark::{run_benchmark, BenchmarkableIndexer}; +use sui_synthetic_ingestion::{IndexerProgress, SyntheticIngestionConfig}; +use tokio::sync::watch; +use tokio::task::JoinHandle; +use tokio_util::sync::CancellationToken; + +pub async fn run_indexer_benchmark( + config: BenchmarkConfig, + pool: ConnectionPool, + metrics: IndexerMetrics, +) { + if config.reset_db { + reset_database(pool.dedicated_connection().await.unwrap()) + .await + .unwrap(); + } else { + run_migrations(pool.dedicated_connection().await.unwrap()) + .await + .unwrap(); + } + let store = PgIndexerStore::new(pool, UploadOptions::default(), metrics.clone()); + let ingestion_dir = tempfile::tempdir().unwrap().into_path(); + let synthetic_ingestion_config = SyntheticIngestionConfig { + ingestion_dir: ingestion_dir.clone(), + checkpoint_size: config.checkpoint_size, + num_checkpoints: config.num_checkpoints, + starting_checkpoint: config.starting_checkpoint, + }; + let indexer = BenchmarkIndexer::new(store, metrics, ingestion_dir); + run_benchmark(synthetic_ingestion_config, indexer).await; +} + +pub struct BenchmarkIndexer { + inner: Option, + cancel: CancellationToken, + committed_checkpoints_rx: watch::Receiver>, + handle: Option>>, +} + +struct BenchmarkIndexerInner { + ingestion_dir: PathBuf, + store: PgIndexerStore, + metrics: IndexerMetrics, + committed_checkpoints_tx: watch::Sender>, +} + +impl BenchmarkIndexer { + pub fn new(store: PgIndexerStore, metrics: IndexerMetrics, ingestion_dir: PathBuf) -> Self { + let cancel = CancellationToken::new(); + let (committed_checkpoints_tx, committed_checkpoints_rx) = watch::channel(None); + Self { + inner: Some(BenchmarkIndexerInner { + ingestion_dir, + store, + metrics, + committed_checkpoints_tx, + }), + cancel, + committed_checkpoints_rx, + handle: None, + } + } +} + +#[async_trait::async_trait] +impl BenchmarkableIndexer for BenchmarkIndexer { + fn subscribe_to_committed_checkpoints(&self) -> watch::Receiver> { + self.committed_checkpoints_rx.clone() + } + + async fn start(&mut self) { + let BenchmarkIndexerInner { + ingestion_dir, + store, + metrics, + committed_checkpoints_tx, + } = self.inner.take().unwrap(); + let ingestion_config = IngestionConfig { + sources: IngestionSources { + data_ingestion_path: Some(ingestion_dir), + ..Default::default() + }, + ..Default::default() + }; + let cancel = self.cancel.clone(); + let handle = tokio::task::spawn(async move { + Indexer::start_writer( + ingestion_config, + store, + metrics, + Default::default(), + None, + cancel, + Some(committed_checkpoints_tx), + ) + .await + }); + self.handle = Some(handle); + } + + async fn stop(mut self) { + self.cancel.cancel(); + self.handle.unwrap().await.unwrap().unwrap(); + } +} diff --git a/crates/sui-indexer/src/config.rs b/crates/sui-indexer/src/config.rs index f51d18ab1ff88..68b0aef539e27 100644 --- a/crates/sui-indexer/src/config.rs +++ b/crates/sui-indexer/src/config.rs @@ -210,6 +210,7 @@ pub enum Command { }, /// Restore the database from formal snaphots. Restore(RestoreConfig), + Benchmark(BenchmarkConfig), } #[derive(Args, Default, Debug, Clone)] @@ -378,6 +379,34 @@ impl Default for RestoreConfig { } } +#[derive(Args, Debug, Clone)] +pub struct BenchmarkConfig { + #[arg( + long, + default_value_t = 200, + help = "Number of transactions in a checkpoint." + )] + pub checkpoint_size: u64, + #[arg( + long, + default_value_t = 2000, + help = "Total number of synthetic checkpoints to generate." + )] + pub num_checkpoints: u64, + #[arg( + long, + default_value_t = 1, + help = "Customize the first checkpoint sequence number to be committed, must be non-zero." + )] + pub starting_checkpoint: u64, + #[arg( + long, + default_value_t = false, + help = "Whether to reset the database before running." + )] + pub reset_db: bool, +} + #[cfg(test)] mod test { use super::*; diff --git a/crates/sui-indexer/src/handlers/checkpoint_handler.rs b/crates/sui-indexer/src/handlers/checkpoint_handler.rs index be4e0d375a923..3d045840a8764 100644 --- a/crates/sui-indexer/src/handlers/checkpoint_handler.rs +++ b/crates/sui-indexer/src/handlers/checkpoint_handler.rs @@ -14,6 +14,7 @@ use move_core_types::language_storage::{StructTag, TypeTag}; use mysten_metrics::{get_metrics, spawn_monitored_task}; use sui_data_ingestion_core::Worker; use sui_rest_api::{CheckpointData, CheckpointTransaction}; +use sui_synthetic_ingestion::IndexerProgress; use sui_types::dynamic_field::DynamicFieldType; use sui_types::effects::{ObjectChange, TransactionEffectsAPI}; use sui_types::event::SystemEpochInfoEvent; @@ -24,6 +25,7 @@ use sui_types::object::Object; use sui_types::object::Owner; use sui_types::sui_system_state::{get_sui_system_state, SuiSystemStateTrait}; use sui_types::transaction::TransactionDataAPI; +use tokio::sync::watch; use crate::errors::IndexerError; use crate::handlers::committer::start_tx_checkpoint_commit_task; @@ -50,6 +52,7 @@ pub async fn new_handlers( metrics: IndexerMetrics, next_checkpoint_sequence_number: CheckpointSequenceNumber, cancel: CancellationToken, + committed_checkpoints_tx: Option>>, ) -> Result { let checkpoint_queue_size = std::env::var("CHECKPOINT_QUEUE_SIZE") .unwrap_or(CHECKPOINT_QUEUE_SIZE.to_string()) @@ -71,7 +74,8 @@ pub async fn new_handlers( metrics_clone, indexed_checkpoint_receiver, next_checkpoint_sequence_number, - cancel.clone() + cancel.clone(), + committed_checkpoints_tx )); Ok(CheckpointHandler::new( state, diff --git a/crates/sui-indexer/src/handlers/committer.rs b/crates/sui-indexer/src/handlers/committer.rs index ad9df09be4894..6c08fda947667 100644 --- a/crates/sui-indexer/src/handlers/committer.rs +++ b/crates/sui-indexer/src/handlers/committer.rs @@ -3,14 +3,16 @@ use std::collections::{BTreeMap, HashMap}; +use sui_synthetic_ingestion::IndexerProgress; +use sui_types::messages_checkpoint::CheckpointSequenceNumber; use tap::tap::TapFallible; +use tokio::sync::watch; use tokio_util::sync::CancellationToken; use tracing::instrument; use tracing::{error, info}; -use sui_types::messages_checkpoint::CheckpointSequenceNumber; - use crate::metrics::IndexerMetrics; +use crate::models::raw_checkpoints::StoredRawCheckpoint; use crate::store::IndexerStore; use crate::types::IndexerResult; @@ -24,6 +26,7 @@ pub async fn start_tx_checkpoint_commit_task( tx_indexing_receiver: mysten_metrics::metered_channel::Receiver, mut next_checkpoint_sequence_number: CheckpointSequenceNumber, cancel: CancellationToken, + mut committed_checkpoints_tx: Option>>, ) -> IndexerResult<()> where S: IndexerStore + Clone + Sync + Send + 'static, @@ -60,7 +63,14 @@ where // The batch will consist of contiguous checkpoints and at most one epoch boundary at // the end. if batch.len() == checkpoint_commit_batch_size || epoch.is_some() { - commit_checkpoints(&state, batch, epoch, &metrics).await; + commit_checkpoints( + &state, + batch, + epoch, + &metrics, + &mut committed_checkpoints_tx, + ) + .await; batch = vec![]; } if let Some(epoch_number) = epoch_number_option { @@ -74,7 +84,7 @@ where } } if !batch.is_empty() { - commit_checkpoints(&state, batch, None, &metrics).await; + commit_checkpoints(&state, batch, None, &metrics, &mut committed_checkpoints_tx).await; batch = vec![]; } } @@ -95,6 +105,7 @@ async fn commit_checkpoints( indexed_checkpoint_batch: Vec, epoch: Option, metrics: &IndexerMetrics, + committed_checkpoints_tx: &mut Option>>, ) where S: IndexerStore + Clone + Sync + Send + 'static, { @@ -135,8 +146,13 @@ async fn commit_checkpoints( packages_batch.push(packages); } - let first_checkpoint_seq = checkpoint_batch.first().as_ref().unwrap().sequence_number; - let committer_watermark = CommitterWatermark::from(checkpoint_batch.last().unwrap()); + let first_checkpoint_seq = checkpoint_batch.first().unwrap().sequence_number; + let last_checkpoint = checkpoint_batch.last().unwrap(); + let indexer_progress = IndexerProgress { + checkpoint: last_checkpoint.sequence_number, + network_total_transactions: last_checkpoint.network_total_transactions, + }; + let committer_watermark = CommitterWatermark::from(last_checkpoint); let guard = metrics.checkpoint_db_commit_latency.start_timer(); let tx_batch = tx_batch.into_iter().flatten().collect::>(); @@ -156,7 +172,7 @@ async fn commit_checkpoints( let raw_checkpoints_batch = checkpoint_batch .iter() .map(|c| c.into()) - .collect::>(); + .collect::>(); { let _step_1_guard = metrics.checkpoint_db_commit_latency_step_1.start_timer(); @@ -266,4 +282,13 @@ async fn commit_checkpoints( metrics .thousand_transaction_avg_db_commit_latency .observe(elapsed * 1000.0 / tx_count as f64); + + if let Some(committed_checkpoints_tx) = committed_checkpoints_tx.as_mut() { + if let Err(err) = committed_checkpoints_tx.send(Some(indexer_progress)) { + error!( + "Failed to send committed checkpoints to the watch channel: {}", + err + ); + } + } } diff --git a/crates/sui-indexer/src/indexer.rs b/crates/sui-indexer/src/indexer.rs index 240e295179094..987382fbd3f5d 100644 --- a/crates/sui-indexer/src/indexer.rs +++ b/crates/sui-indexer/src/indexer.rs @@ -6,7 +6,7 @@ use std::env; use anyhow::Result; use prometheus::Registry; -use tokio::sync::oneshot; +use tokio::sync::{oneshot, watch}; use tokio_util::sync::CancellationToken; use tracing::info; @@ -16,6 +16,7 @@ use mysten_metrics::spawn_monitored_task; use sui_data_ingestion_core::{ DataIngestionMetrics, IndexerExecutor, ProgressStore, ReaderOptions, WorkerPool, }; +use sui_synthetic_ingestion::IndexerProgress; use sui_types::messages_checkpoint::CheckpointSequenceNumber; use crate::build_json_rpc_server; @@ -33,12 +34,13 @@ pub struct Indexer; impl Indexer { pub async fn start_writer( - config: &IngestionConfig, + config: IngestionConfig, store: PgIndexerStore, metrics: IndexerMetrics, snapshot_config: SnapshotLagConfig, retention_config: Option, cancel: CancellationToken, + committed_checkpoints_tx: Option>>, ) -> Result<(), IndexerError> { info!( "Sui Indexer Writer (version {:?}) started...", @@ -98,7 +100,14 @@ impl Indexer { 2, DataIngestionMetrics::new(&Registry::new()), ); - let worker = new_handlers(store, metrics, primary_watermark, cancel.clone()).await?; + let worker = new_handlers( + store, + metrics, + primary_watermark, + cancel.clone(), + committed_checkpoints_tx, + ) + .await?; let worker_pool = WorkerPool::new( worker, "primary".to_string(), diff --git a/crates/sui-indexer/src/lib.rs b/crates/sui-indexer/src/lib.rs index e759370c72798..f40b0fdfcfb8a 100644 --- a/crates/sui-indexer/src/lib.rs +++ b/crates/sui-indexer/src/lib.rs @@ -27,6 +27,7 @@ use errors::IndexerError; pub mod apis; pub mod backfill; +pub mod benchmark; pub mod config; pub mod database; pub mod db; diff --git a/crates/sui-indexer/src/main.rs b/crates/sui-indexer/src/main.rs index 8978d072d8dea..d5281b3dffd6d 100644 --- a/crates/sui-indexer/src/main.rs +++ b/crates/sui-indexer/src/main.rs @@ -3,6 +3,7 @@ use clap::Parser; use sui_indexer::backfill::backfill_runner::BackfillRunner; +use sui_indexer::benchmark::run_indexer_benchmark; use sui_indexer::config::{Command, UploadOptions}; use sui_indexer::database::ConnectionPool; use sui_indexer::db::{ @@ -55,12 +56,13 @@ async fn main() -> anyhow::Result<()> { let store = PgIndexerStore::new(pool, upload_options, indexer_metrics.clone()); Indexer::start_writer( - &ingestion_config, + ingestion_config, store, indexer_metrics, snapshot_config, retention_config, CancellationToken::new(), + None, ) .await?; } @@ -98,6 +100,9 @@ async fn main() -> anyhow::Result<()> { IndexerFormalSnapshotRestorer::new(store, restore_config).await?; formal_restorer.restore().await?; } + Command::Benchmark(benchmark_config) => { + run_indexer_benchmark(benchmark_config, pool, indexer_metrics).await; + } } Ok(()) diff --git a/crates/sui-indexer/src/test_utils.rs b/crates/sui-indexer/src/test_utils.rs index 6a208f8e4c6db..40ad5cab4f4f0 100644 --- a/crates/sui-indexer/src/test_utils.rs +++ b/crates/sui-indexer/src/test_utils.rs @@ -123,12 +123,13 @@ pub async fn start_indexer_writer_for_testing( tokio::spawn(async move { Indexer::start_writer( - &ingestion_config, + ingestion_config, store_clone, indexer_metrics, snapshot_config, retention_config, token_clone, + None, ) .await })