Skip to content

Commit

Permalink
Integrate sui-indexer with benchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
lxfind committed Oct 17, 2024
1 parent 1d33387 commit f548742
Show file tree
Hide file tree
Showing 11 changed files with 200 additions and 15 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/sui-data-ingestion-core/src/worker_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ impl<W: Worker + 'static> WorkerPool<W> {

// Wait for all workers to finish
for join_handle in join_handles {
join_handle.await.expect("worker thread panicked");
let _ = join_handle.await;
}
}
}
3 changes: 2 additions & 1 deletion crates/sui-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ edition = "2021"

[dependencies]
anyhow.workspace = true
rand = "0.8.5"
rand = "0.8.5"
async-trait.workspace = true
axum.workspace = true
backoff.workspace = true
Expand Down Expand Up @@ -64,6 +64,7 @@ sui-protocol-config.workspace = true
telemetry-subscribers.workspace = true
sui-rest-api.workspace = true
sui-transaction-builder.workspace = true
sui-synthetic-ingestion.workspace = true

move-core-types.workspace = true
move-bytecode-utils.workspace = true
Expand Down
115 changes: 115 additions & 0 deletions crates/sui-indexer/src/benchmark.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::config::{BenchmarkConfig, IngestionConfig, IngestionSources, UploadOptions};
use crate::database::ConnectionPool;
use crate::db::{reset_database, run_migrations};
use crate::errors::IndexerError;
use crate::indexer::Indexer;
use crate::metrics::IndexerMetrics;
use crate::store::PgIndexerStore;
use std::path::PathBuf;
use sui_synthetic_ingestion::benchmark::{run_benchmark, BenchmarkableIndexer};
use sui_synthetic_ingestion::{IndexerProgress, SyntheticIngestionConfig};
use tokio::sync::watch;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;

pub async fn run_indexer_benchmark(
config: BenchmarkConfig,
pool: ConnectionPool,
metrics: IndexerMetrics,
) {
if config.reset_db {
reset_database(pool.dedicated_connection().await.unwrap())
.await
.unwrap();
} else {
run_migrations(pool.dedicated_connection().await.unwrap())
.await
.unwrap();
}
let store = PgIndexerStore::new(pool, UploadOptions::default(), metrics.clone());
let ingestion_dir = tempfile::tempdir().unwrap().into_path();
let synthetic_ingestion_config = SyntheticIngestionConfig {
ingestion_dir: ingestion_dir.clone(),
checkpoint_size: config.checkpoint_size,
num_checkpoints: config.num_checkpoints,
};
let indexer = BenchmarkIndexer::new(store, metrics, ingestion_dir);
run_benchmark(synthetic_ingestion_config, indexer).await;
}

pub struct BenchmarkIndexer {
inner: Option<BenchmarkIndexerInner>,
cancel: CancellationToken,
committed_checkpoints_rx: watch::Receiver<Option<IndexerProgress>>,
handle: Option<JoinHandle<anyhow::Result<(), IndexerError>>>,
}

struct BenchmarkIndexerInner {
ingestion_dir: PathBuf,
store: PgIndexerStore,
metrics: IndexerMetrics,
committed_checkpoints_tx: watch::Sender<Option<IndexerProgress>>,
}

impl BenchmarkIndexer {
pub fn new(store: PgIndexerStore, metrics: IndexerMetrics, ingestion_dir: PathBuf) -> Self {
let cancel = CancellationToken::new();
let (committed_checkpoints_tx, committed_checkpoints_rx) = watch::channel(None);
Self {
inner: Some(BenchmarkIndexerInner {
ingestion_dir,
store,
metrics,
committed_checkpoints_tx,
}),
cancel,
committed_checkpoints_rx,
handle: None,
}
}
}

#[async_trait::async_trait]
impl BenchmarkableIndexer for BenchmarkIndexer {
fn subscribe_to_committed_checkpoints(&self) -> watch::Receiver<Option<IndexerProgress>> {
self.committed_checkpoints_rx.clone()
}

async fn start(&mut self) {
let BenchmarkIndexerInner {
ingestion_dir,
store,
metrics,
committed_checkpoints_tx,
} = self.inner.take().unwrap();
let ingestion_config = IngestionConfig {
sources: IngestionSources {
data_ingestion_path: Some(ingestion_dir),
..Default::default()
},
..Default::default()
};
let cancel = self.cancel.clone();
let handle = tokio::task::spawn(async move {
Indexer::start_writer(
ingestion_config,
store,
metrics,
Default::default(),
None,
cancel,
Some(committed_checkpoints_tx),
)
.await
});
self.handle = Some(handle);
}

async fn stop(mut self) {
self.cancel.cancel();
self.handle.unwrap().await.unwrap().unwrap();
}
}
23 changes: 23 additions & 0 deletions crates/sui-indexer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ pub enum Command {
},
/// Restore the database from formal snaphots.
Restore(RestoreConfig),
Benchmark(BenchmarkConfig),
}

#[derive(Args, Default, Debug, Clone)]
Expand Down Expand Up @@ -378,6 +379,28 @@ impl Default for RestoreConfig {
}
}

#[derive(Args, Debug, Clone)]
pub struct BenchmarkConfig {
#[arg(
long,
default_value_t = 200,
help = "Number of transactions in a checkpoint."
)]
pub checkpoint_size: u64,
#[arg(
long,
default_value_t = 2000,
help = "Total number of synthetic checkpoints to generate."
)]
pub num_checkpoints: u64,
#[arg(
long,
default_value_t = false,
help = "Whether to reset the database before running."
)]
pub reset_db: bool,
}

#[cfg(test)]
mod test {
use super::*;
Expand Down
6 changes: 5 additions & 1 deletion crates/sui-indexer/src/handlers/checkpoint_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use move_core_types::language_storage::{StructTag, TypeTag};
use mysten_metrics::{get_metrics, spawn_monitored_task};
use sui_data_ingestion_core::Worker;
use sui_rest_api::{CheckpointData, CheckpointTransaction};
use sui_synthetic_ingestion::IndexerProgress;
use sui_types::dynamic_field::DynamicFieldType;
use sui_types::effects::{ObjectChange, TransactionEffectsAPI};
use sui_types::event::SystemEpochInfoEvent;
Expand All @@ -24,6 +25,7 @@ use sui_types::object::Object;
use sui_types::object::Owner;
use sui_types::sui_system_state::{get_sui_system_state, SuiSystemStateTrait};
use sui_types::transaction::TransactionDataAPI;
use tokio::sync::watch;

use crate::errors::IndexerError;
use crate::handlers::committer::start_tx_checkpoint_commit_task;
Expand All @@ -50,6 +52,7 @@ pub async fn new_handlers(
metrics: IndexerMetrics,
next_checkpoint_sequence_number: CheckpointSequenceNumber,
cancel: CancellationToken,
committed_checkpoints_tx: Option<watch::Sender<Option<IndexerProgress>>>,
) -> Result<CheckpointHandler, IndexerError> {
let checkpoint_queue_size = std::env::var("CHECKPOINT_QUEUE_SIZE")
.unwrap_or(CHECKPOINT_QUEUE_SIZE.to_string())
Expand All @@ -71,7 +74,8 @@ pub async fn new_handlers(
metrics_clone,
indexed_checkpoint_receiver,
next_checkpoint_sequence_number,
cancel.clone()
cancel.clone(),
committed_checkpoints_tx
));
Ok(CheckpointHandler::new(
state,
Expand Down
39 changes: 32 additions & 7 deletions crates/sui-indexer/src/handlers/committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@

use std::collections::{BTreeMap, HashMap};

use sui_synthetic_ingestion::IndexerProgress;
use sui_types::messages_checkpoint::CheckpointSequenceNumber;
use tap::tap::TapFallible;
use tokio::sync::watch;
use tokio_util::sync::CancellationToken;
use tracing::instrument;
use tracing::{error, info};

use sui_types::messages_checkpoint::CheckpointSequenceNumber;

use crate::metrics::IndexerMetrics;
use crate::models::raw_checkpoints::StoredRawCheckpoint;
use crate::store::IndexerStore;
use crate::types::IndexerResult;

Expand All @@ -24,6 +26,7 @@ pub async fn start_tx_checkpoint_commit_task<S>(
tx_indexing_receiver: mysten_metrics::metered_channel::Receiver<CheckpointDataToCommit>,
mut next_checkpoint_sequence_number: CheckpointSequenceNumber,
cancel: CancellationToken,
mut committed_checkpoints_tx: Option<watch::Sender<Option<IndexerProgress>>>,
) -> IndexerResult<()>
where
S: IndexerStore + Clone + Sync + Send + 'static,
Expand Down Expand Up @@ -60,7 +63,14 @@ where
// The batch will consist of contiguous checkpoints and at most one epoch boundary at
// the end.
if batch.len() == checkpoint_commit_batch_size || epoch.is_some() {
commit_checkpoints(&state, batch, epoch, &metrics).await;
commit_checkpoints(
&state,
batch,
epoch,
&metrics,
&mut committed_checkpoints_tx,
)
.await;
batch = vec![];
}
if let Some(epoch_number) = epoch_number_option {
Expand All @@ -74,7 +84,7 @@ where
}
}
if !batch.is_empty() {
commit_checkpoints(&state, batch, None, &metrics).await;
commit_checkpoints(&state, batch, None, &metrics, &mut committed_checkpoints_tx).await;
batch = vec![];
}
}
Expand All @@ -95,6 +105,7 @@ async fn commit_checkpoints<S>(
indexed_checkpoint_batch: Vec<CheckpointDataToCommit>,
epoch: Option<EpochToCommit>,
metrics: &IndexerMetrics,
committed_checkpoints_tx: &mut Option<watch::Sender<Option<IndexerProgress>>>,
) where
S: IndexerStore + Clone + Sync + Send + 'static,
{
Expand Down Expand Up @@ -135,8 +146,13 @@ async fn commit_checkpoints<S>(
packages_batch.push(packages);
}

let first_checkpoint_seq = checkpoint_batch.first().as_ref().unwrap().sequence_number;
let committer_watermark = CommitterWatermark::from(checkpoint_batch.last().unwrap());
let first_checkpoint_seq = checkpoint_batch.first().unwrap().sequence_number;
let last_checkpoint = checkpoint_batch.last().unwrap();
let indexer_progress = IndexerProgress {
checkpoint: last_checkpoint.sequence_number,
network_total_transactions: last_checkpoint.network_total_transactions,
};
let committer_watermark = CommitterWatermark::from(last_checkpoint);

let guard = metrics.checkpoint_db_commit_latency.start_timer();
let tx_batch = tx_batch.into_iter().flatten().collect::<Vec<_>>();
Expand All @@ -156,7 +172,7 @@ async fn commit_checkpoints<S>(
let raw_checkpoints_batch = checkpoint_batch
.iter()
.map(|c| c.into())
.collect::<Vec<_>>();
.collect::<Vec<StoredRawCheckpoint>>();

{
let _step_1_guard = metrics.checkpoint_db_commit_latency_step_1.start_timer();
Expand Down Expand Up @@ -266,4 +282,13 @@ async fn commit_checkpoints<S>(
metrics
.thousand_transaction_avg_db_commit_latency
.observe(elapsed * 1000.0 / tx_count as f64);

if let Some(committed_checkpoints_tx) = committed_checkpoints_tx.as_mut() {
if let Err(err) = committed_checkpoints_tx.send(Some(indexer_progress)) {
error!(
"Failed to send committed checkpoints to the watch channel: {}",
err
);
}
}
}
15 changes: 12 additions & 3 deletions crates/sui-indexer/src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::env;

use anyhow::Result;
use prometheus::Registry;
use tokio::sync::oneshot;
use tokio::sync::{oneshot, watch};
use tokio_util::sync::CancellationToken;
use tracing::info;

Expand All @@ -16,6 +16,7 @@ use mysten_metrics::spawn_monitored_task;
use sui_data_ingestion_core::{
DataIngestionMetrics, IndexerExecutor, ProgressStore, ReaderOptions, WorkerPool,
};
use sui_synthetic_ingestion::IndexerProgress;
use sui_types::messages_checkpoint::CheckpointSequenceNumber;

use crate::build_json_rpc_server;
Expand All @@ -33,12 +34,13 @@ pub struct Indexer;

impl Indexer {
pub async fn start_writer(
config: &IngestionConfig,
config: IngestionConfig,
store: PgIndexerStore,
metrics: IndexerMetrics,
snapshot_config: SnapshotLagConfig,
retention_config: Option<RetentionConfig>,
cancel: CancellationToken,
committed_checkpoints_tx: Option<watch::Sender<Option<IndexerProgress>>>,
) -> Result<(), IndexerError> {
info!(
"Sui Indexer Writer (version {:?}) started...",
Expand Down Expand Up @@ -98,7 +100,14 @@ impl Indexer {
2,
DataIngestionMetrics::new(&Registry::new()),
);
let worker = new_handlers(store, metrics, primary_watermark, cancel.clone()).await?;
let worker = new_handlers(
store,
metrics,
primary_watermark,
cancel.clone(),
committed_checkpoints_tx,
)
.await?;
let worker_pool = WorkerPool::new(
worker,
"primary".to_string(),
Expand Down
1 change: 1 addition & 0 deletions crates/sui-indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use errors::IndexerError;

pub mod apis;
pub mod backfill;
pub mod benchmark;
pub mod config;
pub mod database;
pub mod db;
Expand Down
Loading

0 comments on commit f548742

Please sign in to comment.