diff --git a/crates/iota-indexer/src/handlers/address_metrics_processor.rs b/crates/iota-indexer/src/handlers/address_metrics_processor.rs new file mode 100644 index 00000000000..1f473de6565 --- /dev/null +++ b/crates/iota-indexer/src/handlers/address_metrics_processor.rs @@ -0,0 +1,124 @@ +// Copyright (c) Mysten Labs, Inc. +// Modifications Copyright (c) 2024 IOTA Stiftung +// SPDX-License-Identifier: Apache-2.0 + +use tap::tap::TapFallible; +use tracing::{error, info}; + +use crate::metrics::IndexerMetrics; +use crate::store::IndexerAnalyticalStore; +use crate::types::IndexerResult; + +const ADDRESS_PROCESSOR_BATCH_SIZE: usize = 80000; +const PARALLELISM: usize = 10; + +pub struct AddressMetricsProcessor { + pub store: S, + metrics: IndexerMetrics, + pub address_processor_batch_size: usize, + pub address_processor_parallelism: usize, +} + +impl AddressMetricsProcessor + where + S: IndexerAnalyticalStore + Clone + Sync + Send + 'static, +{ + pub fn new(store: S, metrics: IndexerMetrics) -> AddressMetricsProcessor { + let address_processor_batch_size = std::env::var("ADDRESS_PROCESSOR_BATCH_SIZE") + .map(|s| s.parse::().unwrap_or(ADDRESS_PROCESSOR_BATCH_SIZE)) + .unwrap_or(ADDRESS_PROCESSOR_BATCH_SIZE); + let address_processor_parallelism = std::env::var("ADDRESS_PROCESSOR_PARALLELISM") + .map(|s| s.parse::().unwrap_or(PARALLELISM)) + .unwrap_or(PARALLELISM); + Self { + store, + metrics, + address_processor_batch_size, + address_processor_parallelism, + } + } + + pub async fn start(&self) -> IndexerResult<()> { + info!("Indexer address metrics async processor started..."); + let latest_tx_seq = self + .store + .get_address_metrics_last_processed_tx_seq() + .await?; + let mut last_processed_tx_seq = latest_tx_seq.unwrap_or_default().seq; + loop { + let mut latest_tx = self.store.get_latest_stored_transaction().await?; + while if let Some(tx) = latest_tx { + tx.tx_sequence_number + < last_processed_tx_seq + self.address_processor_batch_size as i64 + } else { + true + } { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + latest_tx = self.store.get_latest_stored_transaction().await?; + } + + let mut persist_tasks = vec![]; + let batch_size = self.address_processor_batch_size; + let step_size = batch_size / self.address_processor_parallelism; + for chunk_start_tx_seq in (last_processed_tx_seq + 1 + ..last_processed_tx_seq + batch_size as i64 + 1) + .step_by(step_size) + { + let active_address_store = self.store.clone(); + persist_tasks.push(tokio::task::spawn_blocking(move || { + active_address_store.persist_active_addresses_in_tx_range( + chunk_start_tx_seq, + chunk_start_tx_seq + step_size as i64, + ) + })); + } + for chunk_start_tx_seq in (last_processed_tx_seq + 1 + ..last_processed_tx_seq + batch_size as i64 + 1) + .step_by(step_size) + { + let address_store = self.store.clone(); + persist_tasks.push(tokio::task::spawn_blocking(move || { + address_store.persist_addresses_in_tx_range( + chunk_start_tx_seq, + chunk_start_tx_seq + step_size as i64, + ) + })); + } + futures::future::join_all(persist_tasks) + .await + .into_iter() + .collect::, _>>() + .tap_err(|e| { + error!("Error joining address persist tasks: {:?}", e); + })? + .into_iter() + .collect::, _>>() + .tap_err(|e| { + error!("Error persisting addresses or active addresses: {:?}", e); + })?; + last_processed_tx_seq += self.address_processor_batch_size as i64; + info!( + "Persisted addresses and active addresses for tx seq: {}", + last_processed_tx_seq, + ); + self.metrics + .latest_address_metrics_tx_seq + .set(last_processed_tx_seq); + + let mut last_processed_tx = self.store.get_tx(last_processed_tx_seq).await?; + while last_processed_tx.is_none() { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + last_processed_tx = self.store.get_tx(last_processed_tx_seq).await?; + } + // unwrap is safe here b/c we just checked that it's not None + let last_processed_cp = last_processed_tx.unwrap().checkpoint_sequence_number; + self.store + .calculate_and_persist_address_metrics(last_processed_cp) + .await?; + info!( + "Persisted address metrics for checkpoint: {}", + last_processed_cp + ); + } + } +} \ No newline at end of file diff --git a/crates/iota-indexer/src/handlers/mod.rs b/crates/iota-indexer/src/handlers/mod.rs index 5839be69eee..b7acbddead7 100644 --- a/crates/iota-indexer/src/handlers/mod.rs +++ b/crates/iota-indexer/src/handlers/mod.rs @@ -16,6 +16,10 @@ pub mod checkpoint_handler; pub mod committer; pub mod objects_snapshot_processor; pub mod tx_processor; +pub mod address_metrics_processor; +pub mod move_call_metrics_processor; +pub mod network_metrics_processor; +pub mod processor_orchestrator; #[derive(Debug)] pub struct CheckpointDataToCommit { diff --git a/crates/iota-indexer/src/handlers/move_call_metrics_processor.rs b/crates/iota-indexer/src/handlers/move_call_metrics_processor.rs new file mode 100644 index 00000000000..a631507f180 --- /dev/null +++ b/crates/iota-indexer/src/handlers/move_call_metrics_processor.rs @@ -0,0 +1,113 @@ +// Copyright (c) Mysten Labs, Inc. +// Modifications Copyright (c) 2024 IOTA Stiftung +// SPDX-License-Identifier: Apache-2.0 + +use tap::tap::TapFallible; +use tracing::{error, info}; + +use crate::metrics::IndexerMetrics; +use crate::store::IndexerAnalyticalStore; +use crate::types::IndexerResult; + +const MOVE_CALL_PROCESSOR_BATCH_SIZE: usize = 80000; +const PARALLELISM: usize = 10; + +pub struct MoveCallMetricsProcessor { + pub store: S, + metrics: IndexerMetrics, + pub move_call_processor_batch_size: usize, + pub move_call_processor_parallelism: usize, +} + +impl MoveCallMetricsProcessor + where + S: IndexerAnalyticalStore + Clone + Sync + Send + 'static, +{ + pub fn new(store: S, metrics: IndexerMetrics) -> MoveCallMetricsProcessor { + let move_call_processor_batch_size = std::env::var("MOVE_CALL_PROCESSOR_BATCH_SIZE") + .map(|s| s.parse::().unwrap_or(MOVE_CALL_PROCESSOR_BATCH_SIZE)) + .unwrap_or(MOVE_CALL_PROCESSOR_BATCH_SIZE); + let move_call_processor_parallelism = std::env::var("MOVE_CALL_PROCESSOR_PARALLELISM") + .map(|s| s.parse::().unwrap_or(PARALLELISM)) + .unwrap_or(PARALLELISM); + Self { + store, + metrics, + move_call_processor_batch_size, + move_call_processor_parallelism, + } + } + + pub async fn start(&self) -> IndexerResult<()> { + info!("Indexer move call metrics async processor started..."); + let latest_move_call_tx_seq = self.store.get_latest_move_call_tx_seq().await?; + let mut last_processed_tx_seq = latest_move_call_tx_seq.unwrap_or_default().seq; + let latest_move_call_epoch = self.store.get_latest_move_call_metrics().await?; + let mut last_processed_epoch = latest_move_call_epoch.unwrap_or_default().epoch; + loop { + let mut latest_tx = self.store.get_latest_stored_transaction().await?; + while if let Some(tx) = latest_tx { + tx.tx_sequence_number + < last_processed_tx_seq + self.move_call_processor_batch_size as i64 + } else { + true + } { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + latest_tx = self.store.get_latest_stored_transaction().await?; + } + + let batch_size = self.move_call_processor_batch_size; + let step_size = batch_size / self.move_call_processor_parallelism; + let mut persist_tasks = vec![]; + for chunk_start_tx_seq in (last_processed_tx_seq + 1 + ..last_processed_tx_seq + batch_size as i64 + 1) + .step_by(step_size) + { + let move_call_store = self.store.clone(); + persist_tasks.push(tokio::task::spawn_blocking(move || { + move_call_store.persist_move_calls_in_tx_range( + chunk_start_tx_seq, + chunk_start_tx_seq + step_size as i64, + ) + })); + } + futures::future::join_all(persist_tasks) + .await + .into_iter() + .collect::, _>>() + .tap_err(|e| { + error!("Error joining move call persist tasks: {:?}", e); + })? + .into_iter() + .collect::, _>>() + .tap_err(|e| { + error!("Error persisting move calls: {:?}", e); + })?; + last_processed_tx_seq += batch_size as i64; + info!("Persisted move_calls at tx seq: {}", last_processed_tx_seq); + self.metrics + .latest_move_call_metrics_tx_seq + .set(last_processed_tx_seq); + + let mut tx = self.store.get_tx(last_processed_tx_seq).await?; + while tx.is_none() { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + tx = self.store.get_tx(last_processed_tx_seq).await?; + } + let cp_seq = tx.unwrap().checkpoint_sequence_number; + let mut cp = self.store.get_cp(cp_seq).await?; + while cp.is_none() { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + cp = self.store.get_cp(cp_seq).await?; + } + let end_epoch = cp.unwrap().epoch; + for epoch in last_processed_epoch + 1..end_epoch { + self.store + .calculate_and_persist_move_call_metrics(epoch) + .await?; + info!("Persisted move_call_metrics for epoch: {}", epoch); + } + last_processed_epoch = end_epoch - 1; + } + } +} \ No newline at end of file diff --git a/crates/iota-indexer/src/handlers/network_metrics_processor.rs b/crates/iota-indexer/src/handlers/network_metrics_processor.rs new file mode 100644 index 00000000000..e046ef8c84c --- /dev/null +++ b/crates/iota-indexer/src/handlers/network_metrics_processor.rs @@ -0,0 +1,129 @@ +// Copyright (c) Mysten Labs, Inc. +// Modifications Copyright (c) 2024 IOTA Stiftung +// SPDX-License-Identifier: Apache-2.0 + +use tap::tap::TapFallible; +use tracing::{error, info}; + +use crate::errors::IndexerError; +use crate::metrics::IndexerMetrics; +use crate::store::IndexerAnalyticalStore; +use crate::types::IndexerResult; + +const NETWORK_METRICS_PROCESSOR_BATCH_SIZE: usize = 10; +const PARALLELISM: usize = 1; + +pub struct NetworkMetricsProcessor { + pub store: S, + metrics: IndexerMetrics, + pub network_processor_metrics_batch_size: usize, + pub network_processor_metrics_parallelism: usize, +} + +impl NetworkMetricsProcessor + where + S: IndexerAnalyticalStore + Clone + Sync + Send + 'static, +{ + pub fn new(store: S, metrics: IndexerMetrics) -> NetworkMetricsProcessor { + let network_processor_metrics_batch_size = + std::env::var("NETWORK_PROCESSOR_METRICS_BATCH_SIZE") + .map(|s| { + s.parse::() + .unwrap_or(NETWORK_METRICS_PROCESSOR_BATCH_SIZE) + }) + .unwrap_or(NETWORK_METRICS_PROCESSOR_BATCH_SIZE); + let network_processor_metrics_parallelism = + std::env::var("NETWORK_PROCESSOR_METRICS_PARALLELISM") + .map(|s| s.parse::().unwrap_or(PARALLELISM)) + .unwrap_or(PARALLELISM); + Self { + store, + metrics, + network_processor_metrics_batch_size, + network_processor_metrics_parallelism, + } + } + + pub async fn start(&self) -> IndexerResult<()> { + info!("Indexer network metrics async processor started..."); + let latest_tx_count_metrics = self + .store + .get_latest_tx_count_metrics() + .await + .unwrap_or_default(); + let latest_epoch_peak_tps = self + .store + .get_latest_epoch_peak_tps() + .await + .unwrap_or_default(); + let mut last_processed_cp_seq = latest_tx_count_metrics + .unwrap_or_default() + .checkpoint_sequence_number; + let mut last_processed_peak_tps_epoch = latest_epoch_peak_tps.unwrap_or_default().epoch; + loop { + let mut latest_stored_checkpoint = self.store.get_latest_stored_checkpoint().await?; + while if let Some(cp) = latest_stored_checkpoint { + cp.sequence_number + < last_processed_cp_seq + self.network_processor_metrics_batch_size as i64 + } else { + true + } { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + latest_stored_checkpoint = self.store.get_latest_stored_checkpoint().await?; + } + + info!( + "Persisting tx count metrics for checkpoint sequence number {}", + last_processed_cp_seq + ); + let batch_size = self.network_processor_metrics_batch_size; + let step_size = batch_size / self.network_processor_metrics_parallelism; + let mut persist_tasks = vec![]; + for chunk_start_cp in (last_processed_cp_seq + 1 + ..last_processed_cp_seq + batch_size as i64 + 1) + .step_by(step_size) + { + let store = self.store.clone(); + persist_tasks.push(tokio::task::spawn_blocking(move || { + store + .persist_tx_count_metrics(chunk_start_cp, chunk_start_cp + step_size as i64) + })); + } + futures::future::join_all(persist_tasks) + .await + .into_iter() + .collect::, _>>() + .tap_err(|e| { + error!("Error joining network persist tasks: {:?}", e); + })? + .into_iter() + .collect::, _>>() + .tap_err(|e| { + error!("Error persisting tx count metrics: {:?}", e); + })?; + last_processed_cp_seq += batch_size as i64; + info!( + "Persisted tx count metrics for checkpoint sequence number {}", + last_processed_cp_seq + ); + self.metrics + .latest_network_metrics_cp_seq + .set(last_processed_cp_seq); + + let end_cp = self + .store + .get_checkpoints_in_range(last_processed_cp_seq, last_processed_cp_seq + 1) + .await? + .first() + .ok_or(IndexerError::PostgresReadError( + "Cannot read checkpoint from PG for epoch peak TPS".to_string(), + ))? + .clone(); + for epoch in last_processed_peak_tps_epoch + 1..end_cp.epoch { + self.store.persist_epoch_peak_tps(epoch).await?; + last_processed_peak_tps_epoch = epoch; + info!("Persisted epoch peak TPS for epoch {}", epoch); + } + } + } +} \ No newline at end of file diff --git a/crates/iota-indexer/src/handlers/processor_orchestrator.rs b/crates/iota-indexer/src/handlers/processor_orchestrator.rs new file mode 100644 index 00000000000..0c78d1c0698 --- /dev/null +++ b/crates/iota-indexer/src/handlers/processor_orchestrator.rs @@ -0,0 +1,83 @@ +// Copyright (c) Mysten Labs, Inc. +// Modifications Copyright (c) 2024 IOTA Stiftung +// SPDX-License-Identifier: Apache-2.0 + +use futures::future::try_join_all; +use tracing::{error, info}; + +use crate::metrics::IndexerMetrics; +use crate::store::IndexerAnalyticalStore; + +use super::address_metrics_processor::AddressMetricsProcessor; +use super::move_call_metrics_processor::MoveCallMetricsProcessor; +use super::network_metrics_processor::NetworkMetricsProcessor; + +pub struct ProcessorOrchestrator { + store: S, + metrics: IndexerMetrics, +} + +impl ProcessorOrchestrator + where + S: IndexerAnalyticalStore + Clone + Send + Sync + 'static, +{ + pub fn new(store: S, metrics: IndexerMetrics) -> Self { + Self { store, metrics } + } + + pub async fn run_forever(&mut self) { + info!("Processor orchestrator started..."); + let network_metrics_processor = + NetworkMetricsProcessor::new(self.store.clone(), self.metrics.clone()); + let network_metrics_handle = tokio::task::spawn(async move { + loop { + let network_metrics_res = network_metrics_processor.start().await; + if let Err(e) = network_metrics_res { + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + error!( + "Indexer network metrics processor failed with error {:?}, retrying in 5s...", + e + ); + } + } + }); + + let addr_metrics_processor = + AddressMetricsProcessor::new(self.store.clone(), self.metrics.clone()); + let addr_metrics_handle = tokio::task::spawn(async move { + loop { + let addr_metrics_res = addr_metrics_processor.start().await; + if let Err(e) = addr_metrics_res { + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + error!( + "Indexer address metrics processor failed with error {:?}, retrying in 5s...", + e + ); + } + } + }); + + let move_call_metrics_processor = + MoveCallMetricsProcessor::new(self.store.clone(), self.metrics.clone()); + let move_call_metrics_handle = tokio::task::spawn(async move { + loop { + let move_call_metrics_res = move_call_metrics_processor.start().await; + if let Err(e) = move_call_metrics_res { + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + error!( + "Indexer move call metrics processor failed with error {:?}, retrying in 5s...", + e + ); + } + } + }); + + try_join_all(vec![ + network_metrics_handle, + addr_metrics_handle, + move_call_metrics_handle, + ]) + .await + .expect("Processor orchestrator should not run into errors."); + } +} \ No newline at end of file diff --git a/crates/iota-indexer/src/indexer.rs b/crates/iota-indexer/src/indexer.rs index 41ba04d54e5..3c6296aac64 100644 --- a/crates/iota-indexer/src/indexer.rs +++ b/crates/iota-indexer/src/indexer.rs @@ -22,6 +22,8 @@ use crate::{ store::IndexerStore, IndexerConfig, }; +use crate::handlers::processor_orchestrator::ProcessorOrchestrator; +use crate::store::PgIndexerAnalyticalStore; const DOWNLOAD_QUEUE_SIZE: usize = 200; @@ -115,4 +117,17 @@ impl Indexer { Ok(()) } + + pub async fn start_analytical_worker( + store: PgIndexerAnalyticalStore, + metrics: IndexerMetrics, + ) -> Result<(), IndexerError> { + info!( + "Sui Indexer Analytical Worker (version {:?}) started...", + env!("CARGO_PKG_VERSION") + ); + let mut processor_orchestrator = ProcessorOrchestrator::new(store, metrics); + processor_orchestrator.run_forever().await; + Ok(()) + } } diff --git a/crates/iota-indexer/src/indexer_reader.rs b/crates/iota-indexer/src/indexer_reader.rs index 974f980dd94..bd84ca74754 100644 --- a/crates/iota-indexer/src/indexer_reader.rs +++ b/crates/iota-indexer/src/indexer_reader.rs @@ -19,6 +19,7 @@ use iota_json_rpc_types::{ IotaCoinMetadata, IotaEvent, IotaObjectDataFilter, IotaTransactionBlockEffects, IotaTransactionBlockEffectsAPI, IotaTransactionBlockResponse, TransactionFilter, }; + use iota_types::{ balance::Supply, base_types::{IotaAddress, ObjectID, ObjectRef, SequenceNumber, VersionNumber}, @@ -1510,6 +1511,119 @@ impl IndexerReader { .collect::>>() } + pub fn get_latest_network_metrics(&self) -> IndexerResult { + let metrics = self.run_query(|conn| { + diesel::sql_query("SELECT * FROM network_metrics;") + .get_result::(conn) + })?; + Ok(metrics.into()) + } + + pub fn get_latest_move_call_metrics(&self) -> IndexerResult { + let latest_3d_move_call_metrics = self.run_query(|conn| { + move_call_metrics::table + .filter(move_call_metrics::dsl::day.eq(3)) + .order(move_call_metrics::dsl::id.desc()) + .limit(10) + .load::(conn) + })?; + let latest_7d_move_call_metrics = self.run_query(|conn| { + move_call_metrics::table + .filter(move_call_metrics::dsl::day.eq(7)) + .order(move_call_metrics::dsl::id.desc()) + .limit(10) + .load::(conn) + })?; + let latest_30d_move_call_metrics = self.run_query(|conn| { + move_call_metrics::table + .filter(move_call_metrics::dsl::day.eq(30)) + .order(move_call_metrics::dsl::id.desc()) + .limit(10) + .load::(conn) + })?; + + let latest_3_days: Vec<(MoveFunctionName, usize)> = latest_3d_move_call_metrics + .into_iter() + .map(|m| m.try_into()) + .collect::, _>>()?; + let latest_7_days: Vec<(MoveFunctionName, usize)> = latest_7d_move_call_metrics + .into_iter() + .map(|m| m.try_into()) + .collect::, _>>()?; + let latest_30_days: Vec<(MoveFunctionName, usize)> = latest_30d_move_call_metrics + .into_iter() + .map(|m| m.try_into()) + .collect::, _>>()?; + // sort by call count desc. + let rank_3_days = latest_3_days + .into_iter() + .sorted_by(|a, b| b.1.cmp(&a.1)) + .collect::>(); + let rank_7_days = latest_7_days + .into_iter() + .sorted_by(|a, b| b.1.cmp(&a.1)) + .collect::>(); + let rank_30_days = latest_30_days + .into_iter() + .sorted_by(|a, b| b.1.cmp(&a.1)) + .collect::>(); + Ok(MoveCallMetrics { + rank_3_days, + rank_7_days, + rank_30_days, + }) + } + + pub fn get_latest_address_metrics(&self) -> IndexerResult { + let stored_address_metrics = self.run_query(|conn| { + address_metrics::table + .order(address_metrics::dsl::checkpoint.desc()) + .first::(conn) + })?; + Ok(stored_address_metrics.into()) + } + + pub fn get_checkpoint_address_metrics( + &self, + checkpoint_seq: u64, + ) -> IndexerResult { + let stored_address_metrics = self.run_query(|conn| { + address_metrics::table + .filter(address_metrics::dsl::checkpoint.eq(checkpoint_seq as i64)) + .first::(conn) + })?; + Ok(stored_address_metrics.into()) + } + + pub fn get_all_epoch_address_metrics( + &self, + descending_order: Option, + ) -> IndexerResult> { + let is_descending = descending_order.unwrap_or_default(); + let epoch_address_metrics_query = format!( + "WITH ranked_rows AS ( + SELECT + checkpoint, epoch, timestamp_ms, cumulative_addresses, cumulative_active_addresses, daily_active_addresses, + row_number() OVER(PARTITION BY epoch ORDER BY checkpoint DESC) as row_num + FROM + address_metrics + ) + SELECT + checkpoint, epoch, timestamp_ms, cumulative_addresses, cumulative_active_addresses, daily_active_addresses + FROM ranked_rows + WHERE row_num = 1 ORDER BY epoch {}", + if is_descending { "DESC" } else { "ASC" }, + ); + let epoch_address_metrics = self.run_query(|conn| { + diesel::sql_query(epoch_address_metrics_query).load::(conn) + })?; + + Ok(epoch_address_metrics + .into_iter() + .map(|stored_address_metrics| stored_address_metrics.into()) + .collect()) + } + pub(crate) async fn get_display_fields( &self, original_object: &iota_types::object::Object, diff --git a/crates/iota-indexer/src/lib.rs b/crates/iota-indexer/src/lib.rs index 6431e147a2a..add2a3606ed 100644 --- a/crates/iota-indexer/src/lib.rs +++ b/crates/iota-indexer/src/lib.rs @@ -74,6 +74,8 @@ pub struct IndexerConfig { pub fullnode_sync_worker: bool, #[clap(long)] pub rpc_server_worker: bool, + #[clap(long)] + pub analytical_worker: bool, } impl IndexerConfig { @@ -136,6 +138,7 @@ impl Default for IndexerConfig { reset_db: false, fullnode_sync_worker: true, rpc_server_worker: true, + analytical_worker: false, } } } diff --git a/crates/iota-indexer/src/main.rs b/crates/iota-indexer/src/main.rs index bca3acdfe82..a5c1e895aba 100644 --- a/crates/iota-indexer/src/main.rs +++ b/crates/iota-indexer/src/main.rs @@ -8,7 +8,9 @@ use iota_indexer::{ errors::IndexerError, indexer::Indexer, metrics::{start_prometheus_server, IndexerMetrics}, - store::PgIndexerStore, + store::{ + PgIndexerAnalyticalStore, PgIndexerStore, + }, IndexerConfig, }; use tracing::{error, info}; @@ -92,6 +94,9 @@ async fn main() -> Result<(), IndexerError> { return Indexer::start_writer(&indexer_config, store, indexer_metrics).await; } else if indexer_config.rpc_server_worker { return Indexer::start_reader(&indexer_config, ®istry, db_url).await; + } else if indexer_config.analytical_worker { + let store = PgIndexerAnalyticalStore::new(blocking_cp); + return Indexer::start_analytical_worker(store, indexer_metrics.clone()).await; } Ok(()) } diff --git a/crates/iota-indexer/src/metrics.rs b/crates/iota-indexer/src/metrics.rs index 804bd8c9e7f..4385b97d5ce 100644 --- a/crates/iota-indexer/src/metrics.rs +++ b/crates/iota-indexer/src/metrics.rs @@ -91,6 +91,10 @@ pub struct IndexerMetrics { pub latest_tx_checkpoint_sequence_number: IntGauge, pub latest_indexer_object_checkpoint_sequence_number: IntGauge, pub latest_object_snapshot_sequence_number: IntGauge, + // analytical + pub latest_move_call_metrics_tx_seq: IntGauge, + pub latest_address_metrics_tx_seq: IntGauge, + pub latest_network_metrics_cp_seq: IntGauge, // checkpoint E2E latency is: // fullnode_download_latency + checkpoint_index_latency + db_commit_latency pub checkpoint_download_bytes_size: IntGauge, @@ -241,6 +245,21 @@ impl IndexerMetrics { "Latest object snapshot sequence number from the Indexer", registry, ).unwrap(), + latest_move_call_metrics_tx_seq: register_int_gauge_with_registry!( + "latest_move_call_metrics_tx_seq", + "Latest move call metrics tx seq", + registry, + ).unwrap(), + latest_address_metrics_tx_seq: register_int_gauge_with_registry!( + "latest_address_metrics_tx_seq", + "Latest address metrics tx seq", + registry, + ).unwrap(), + latest_network_metrics_cp_seq: register_int_gauge_with_registry!( + "latest_network_metrics_cp_seq", + "Latest network metrics cp seq", + registry, + ).unwrap(), checkpoint_download_bytes_size: register_int_gauge_with_registry!( "checkpoint_download_bytes_size", "Size of the downloaded checkpoint in bytes", diff --git a/crates/iota-indexer/src/models/address_metrics.rs b/crates/iota-indexer/src/models/address_metrics.rs new file mode 100644 index 00000000000..bc3dfbdf0d3 --- /dev/null +++ b/crates/iota-indexer/src/models/address_metrics.rs @@ -0,0 +1,112 @@ +// Copyright (c) Mysten Labs, Inc. +// Modifications Copyright (c) 2024 IOTA Stiftung +// SPDX-License-Identifier: Apache-2.0 + +use std::collections::HashMap; + +use diesel::prelude::*; +use diesel::sql_types::BigInt; + +use iota_json_rpc_types::AddressMetrics; + +use crate::schema::{active_addresses, address_metrics, addresses}; + +#[derive(Clone, Debug, Queryable, Insertable)] +#[diesel(table_name = addresses)] +pub struct StoredAddress { + pub address: Vec, + pub first_appearance_tx: i64, + pub first_appearance_time: i64, + pub last_appearance_tx: i64, + pub last_appearance_time: i64, +} + +#[derive(Clone, Debug, Queryable, Insertable)] +#[diesel(table_name = active_addresses)] +pub struct StoredActiveAddress { + pub address: Vec, + pub first_appearance_tx: i64, + pub first_appearance_time: i64, + pub last_appearance_tx: i64, + pub last_appearance_time: i64, +} + +impl From for StoredActiveAddress { + fn from(address: StoredAddress) -> Self { + StoredActiveAddress { + address: address.address, + first_appearance_tx: address.first_appearance_tx, + first_appearance_time: address.first_appearance_time, + last_appearance_tx: address.last_appearance_tx, + last_appearance_time: address.last_appearance_time, + } + } +} + +#[derive(Clone, Debug, Default, Queryable, Insertable, QueryableByName)] +#[diesel(table_name = address_metrics)] +pub struct StoredAddressMetrics { + #[diesel(sql_type = BigInt)] + pub checkpoint: i64, + #[diesel(sql_type = BigInt)] + pub epoch: i64, + #[diesel(sql_type = BigInt)] + pub timestamp_ms: i64, + #[diesel(sql_type = BigInt)] + pub cumulative_addresses: i64, + #[diesel(sql_type = BigInt)] + pub cumulative_active_addresses: i64, + #[diesel(sql_type = BigInt)] + pub daily_active_addresses: i64, +} + +impl From for AddressMetrics { + fn from(metrics: StoredAddressMetrics) -> Self { + Self { + checkpoint: metrics.checkpoint as u64, + epoch: metrics.epoch as u64, + timestamp_ms: metrics.timestamp_ms as u64, + cumulative_addresses: metrics.cumulative_addresses as u64, + cumulative_active_addresses: metrics.cumulative_active_addresses as u64, + daily_active_addresses: metrics.daily_active_addresses as u64, + } + } +} + +#[derive(Clone, Debug)] +pub struct AddressInfoToCommit { + pub address: Vec, + pub tx_seq: i64, + pub timestamp_ms: i64, +} + +pub fn dedup_addresses(addrs_to_commit: Vec) -> Vec { + let mut compressed_addr_map: HashMap<_, StoredAddress> = HashMap::new(); + for addr_to_commit in addrs_to_commit { + let entry = compressed_addr_map + .entry(addr_to_commit.address.clone()) + .or_insert_with(|| StoredAddress { + address: addr_to_commit.address.clone(), + first_appearance_time: addr_to_commit.timestamp_ms, + first_appearance_tx: addr_to_commit.tx_seq, + last_appearance_time: addr_to_commit.timestamp_ms, + last_appearance_tx: addr_to_commit.tx_seq, + }); + + if addr_to_commit.timestamp_ms < entry.first_appearance_time { + entry.first_appearance_time = addr_to_commit.timestamp_ms; + entry.first_appearance_tx = addr_to_commit.tx_seq; + } + if addr_to_commit.timestamp_ms > entry.last_appearance_time { + entry.last_appearance_time = addr_to_commit.timestamp_ms; + entry.last_appearance_tx = addr_to_commit.tx_seq; + } + } + compressed_addr_map.values().cloned().collect() +} + +#[derive(Clone, Debug)] +pub struct TxTimestampInfo { + pub tx_seq: i64, + pub timestamp_ms: i64, +} \ No newline at end of file diff --git a/crates/iota-indexer/src/models/mod.rs b/crates/iota-indexer/src/models/mod.rs index cfa32ddb294..a0e337e95c3 100644 --- a/crates/iota-indexer/src/models/mod.rs +++ b/crates/iota-indexer/src/models/mod.rs @@ -2,11 +2,15 @@ // Modifications Copyright (c) 2024 IOTA Stiftung // SPDX-License-Identifier: Apache-2.0 +pub mod address_metrics; pub mod checkpoints; pub mod display; pub mod epoch; pub mod events; +pub mod move_call_metrics; +pub mod network_metrics; pub mod objects; pub mod packages; pub mod transactions; +pub mod tx_count_metrics; pub mod tx_indices; diff --git a/crates/iota-indexer/src/models/move_call_metrics.rs b/crates/iota-indexer/src/models/move_call_metrics.rs new file mode 100644 index 00000000000..e7652426d9a --- /dev/null +++ b/crates/iota-indexer/src/models/move_call_metrics.rs @@ -0,0 +1,124 @@ +// Copyright (c) Mysten Labs, Inc. +// Modifications Copyright (c) 2024 IOTA Stiftung +// SPDX-License-Identifier: Apache-2.0 + +use std::str::FromStr; + +use diesel::prelude::*; +use diesel::sql_types::{BigInt, Binary, Text}; +use diesel::QueryableByName; + +use move_core_types::identifier::Identifier; +use sui_json_rpc_types::MoveFunctionName; +use sui_types::base_types::ObjectID; + +use crate::errors::IndexerError; +use crate::schema::{move_call_metrics, move_calls}; + +#[derive(Clone, Debug, Queryable, Insertable)] +#[diesel(table_name = move_calls)] +pub struct StoredMoveCall { + pub transaction_sequence_number: i64, + pub checkpoint_sequence_number: i64, + pub epoch: i64, + pub move_package: Vec, + pub move_module: String, + pub move_function: String, +} + +#[derive(Clone, Debug, Insertable)] +#[diesel(table_name = move_call_metrics)] +pub struct StoredMoveCallMetrics { + pub id: Option, + pub epoch: i64, + pub day: i64, + pub move_package: String, + pub move_module: String, + pub move_function: String, + pub count: i64, +} + +impl Default for StoredMoveCallMetrics { + fn default() -> Self { + Self { + id: None, + epoch: -1, + day: -1, + move_package: "".to_string(), + move_module: "".to_string(), + move_function: "".to_string(), + count: -1, + } + } +} + +// for auto-incremented id, the committed id is None, so Option, +// but when querying, the returned type is i64, thus a separate type is needed. +#[derive(Clone, Debug, Queryable)] +#[diesel(table_name = move_call_metrics)] +pub struct QueriedMoveCallMetrics { + pub id: i64, + pub epoch: i64, + pub day: i64, + pub move_package: String, + pub move_module: String, + pub move_function: String, + pub count: i64, +} + +impl TryInto<(MoveFunctionName, usize)> for QueriedMoveCallMetrics { + type Error = IndexerError; + + fn try_into(self) -> Result<(MoveFunctionName, usize), Self::Error> { + let package = ObjectID::from_str(&self.move_package)?; + let module = Identifier::from_str(&self.move_module)?; + let function = Identifier::from_str(&self.move_function)?; + Ok(( + MoveFunctionName { + package, + module, + function, + }, + self.count as usize, + )) + } +} + +impl From for StoredMoveCallMetrics { + fn from(q: QueriedMoveCallMetrics) -> Self { + StoredMoveCallMetrics { + id: Some(q.id), + epoch: q.epoch, + day: q.day, + move_package: q.move_package, + move_module: q.move_module, + move_function: q.move_function, + count: q.count, + } + } +} + +#[derive(QueryableByName, Debug, Clone, Default)] +pub struct QueriedMoveMetrics { + #[diesel(sql_type = BigInt)] + pub epoch: i64, + #[diesel(sql_type = BigInt)] + pub day: i64, + #[diesel(sql_type = Binary)] + pub move_package: Vec, + #[diesel(sql_type = Text)] + pub move_module: String, + #[diesel(sql_type = Text)] + pub move_function: String, + #[diesel(sql_type = BigInt)] + pub count: i64, +} + +pub fn build_move_call_metric_query(epoch: i64, days: i64) -> String { + format!("SELECT {}::BIGINT AS epoch, {}::BIGINT AS day, move_package, move_module, move_function, COUNT(*)::BIGINT AS count + FROM move_calls + WHERE epoch >= {} + GROUP BY move_package, move_module, move_function + ORDER BY count DESC + LIMIT 10;", epoch, days, epoch - days) +} \ No newline at end of file diff --git a/crates/iota-indexer/src/models/network_metrics.rs b/crates/iota-indexer/src/models/network_metrics.rs new file mode 100644 index 00000000000..7fa2d1c5219 --- /dev/null +++ b/crates/iota-indexer/src/models/network_metrics.rs @@ -0,0 +1,66 @@ +// Copyright (c) Mysten Labs, Inc. +// Modifications Copyright (c) 2024 IOTA Stiftung +// SPDX-License-Identifier: Apache-2.0 + +use diesel::prelude::*; +use diesel::sql_types::{BigInt, Double, Float8}; + +use sui_json_rpc_types::NetworkMetrics; + +use crate::schema::epoch_peak_tps; + +#[derive(Clone, Debug, Queryable, Insertable)] +#[diesel(table_name = epoch_peak_tps)] +pub struct StoredEpochPeakTps { + pub epoch: i64, + pub peak_tps: f64, + pub peak_tps_30d: f64, +} + +impl Default for StoredEpochPeakTps { + fn default() -> Self { + Self { + epoch: -1, + peak_tps: 0.0, + peak_tps_30d: 0.0, + } + } +} + +#[derive(QueryableByName, Debug, Clone, Default)] +pub struct StoredNetworkMetrics { + #[diesel(sql_type = Double)] + pub current_tps: f64, + #[diesel(sql_type = Double)] + pub tps_30_days: f64, + #[diesel(sql_type = BigInt)] + pub total_packages: i64, + #[diesel(sql_type = BigInt)] + pub total_addresses: i64, + #[diesel(sql_type = BigInt)] + pub total_objects: i64, + #[diesel(sql_type = BigInt)] + pub current_epoch: i64, + #[diesel(sql_type = BigInt)] + pub current_checkpoint: i64, +} + +impl From for NetworkMetrics { + fn from(db: StoredNetworkMetrics) -> Self { + Self { + current_tps: db.current_tps, + tps_30_days: db.tps_30_days, + total_packages: db.total_packages as u64, + total_addresses: db.total_addresses as u64, + total_objects: db.total_objects as u64, + current_epoch: db.current_epoch as u64, + current_checkpoint: db.current_checkpoint as u64, + } + } +} + +#[derive(Debug, QueryableByName)] +pub struct Tps { + #[diesel(sql_type = Float8)] + pub peak_tps: f64, +} \ No newline at end of file diff --git a/crates/iota-indexer/src/models/tx_count_metrics.rs b/crates/iota-indexer/src/models/tx_count_metrics.rs new file mode 100644 index 00000000000..388eff9bb89 --- /dev/null +++ b/crates/iota-indexer/src/models/tx_count_metrics.rs @@ -0,0 +1,31 @@ +// Copyright (c) Mysten Labs, Inc. +// Modifications Copyright (c) 2024 IOTA Stiftung +// SPDX-License-Identifier: Apache-2.0 + +use diesel::prelude::*; + +use crate::schema::tx_count_metrics; + +#[derive(Clone, Debug, Queryable, Insertable)] +#[diesel(table_name = tx_count_metrics)] +pub struct StoredTxCountMetrics { + pub checkpoint_sequence_number: i64, + pub epoch: i64, + pub timestamp_ms: i64, + pub total_transaction_blocks: i64, + pub total_successful_transaction_blocks: i64, + pub total_successful_transactions: i64, +} + +impl Default for StoredTxCountMetrics { + fn default() -> Self { + Self { + checkpoint_sequence_number: -1, + epoch: -1, + timestamp_ms: -1, + total_transaction_blocks: -1, + total_successful_transaction_blocks: -1, + total_successful_transactions: -1, + } + } +} \ No newline at end of file diff --git a/crates/iota-indexer/src/store/indexer_analytics_store.rs b/crates/iota-indexer/src/store/indexer_analytics_store.rs new file mode 100644 index 00000000000..0e964a3b931 --- /dev/null +++ b/crates/iota-indexer/src/store/indexer_analytics_store.rs @@ -0,0 +1,77 @@ +// Copyright (c) Mysten Labs, Inc. +// Modifications Copyright (c) 2024 IOTA Stiftung +// SPDX-License-Identifier: Apache-2.0 + +use async_trait::async_trait; + +use crate::models::checkpoints::StoredCheckpoint; +use crate::models::move_call_metrics::StoredMoveCallMetrics; +use crate::models::network_metrics::StoredEpochPeakTps; +use crate::models::transactions::{ + StoredTransaction, StoredTransactionCheckpoint, StoredTransactionSuccessCommandCount, + StoredTransactionTimestamp, TxSeq, +}; +use crate::models::tx_count_metrics::StoredTxCountMetrics; +use crate::types::IndexerResult; + +#[async_trait] +pub trait IndexerAnalyticalStore { + async fn get_latest_stored_transaction(&self) -> IndexerResult>; + async fn get_latest_stored_checkpoint(&self) -> IndexerResult>; + async fn get_checkpoints_in_range( + &self, + start_checkpoint: i64, + end_checkpoint: i64, + ) -> IndexerResult>; + async fn get_tx_timestamps_in_checkpoint_range( + &self, + start_checkpoint: i64, + end_checkpoint: i64, + ) -> IndexerResult>; + async fn get_tx_checkpoints_in_checkpoint_range( + &self, + start_checkpoint: i64, + end_checkpoint: i64, + ) -> IndexerResult>; + async fn get_tx_success_cmd_counts_in_checkpoint_range( + &self, + start_checkpoint: i64, + end_checkpoint: i64, + ) -> IndexerResult>; + async fn get_tx(&self, tx_sequence_number: i64) -> IndexerResult>; + async fn get_cp(&self, sequence_number: i64) -> IndexerResult>; + + // for network metrics including TPS and counts of objects etc. + async fn get_latest_tx_count_metrics(&self) -> IndexerResult>; + async fn get_latest_epoch_peak_tps(&self) -> IndexerResult>; + fn persist_tx_count_metrics( + &self, + start_checkpoint: i64, + end_checkpoint: i64, + ) -> IndexerResult<()>; + async fn persist_epoch_peak_tps(&self, epoch: i64) -> IndexerResult<()>; + + // for address metrics + async fn get_address_metrics_last_processed_tx_seq(&self) -> IndexerResult>; + fn persist_addresses_in_tx_range( + &self, + start_tx_seq: i64, + end_tx_seq: i64, + ) -> IndexerResult<()>; + fn persist_active_addresses_in_tx_range( + &self, + start_tx_seq: i64, + end_tx_seq: i64, + ) -> IndexerResult<()>; + async fn calculate_and_persist_address_metrics(&self, checkpoint: i64) -> IndexerResult<()>; + + // for move call metrics + async fn get_latest_move_call_metrics(&self) -> IndexerResult>; + async fn get_latest_move_call_tx_seq(&self) -> IndexerResult>; + fn persist_move_calls_in_tx_range( + &self, + start_tx_seq: i64, + end_tx_seq: i64, + ) -> IndexerResult<()>; + async fn calculate_and_persist_move_call_metrics(&self, epoch: i64) -> IndexerResult<()>; +} \ No newline at end of file diff --git a/crates/iota-indexer/src/store/mod.rs b/crates/iota-indexer/src/store/mod.rs index f0417495fd2..cd233140c2f 100644 --- a/crates/iota-indexer/src/store/mod.rs +++ b/crates/iota-indexer/src/store/mod.rs @@ -2,11 +2,15 @@ // Modifications Copyright (c) 2024 IOTA Stiftung // SPDX-License-Identifier: Apache-2.0 +pub(crate) use indexer_analytical_store::*; pub(crate) use indexer_store::*; +pub use pg_indexer_analytical_store::PgIndexerAnalyticalStore; pub use pg_indexer_store::PgIndexerStore; +mod indexer_analytics_store; pub mod indexer_store; pub mod module_resolver; +mod pg_indexer_analytical_store; mod pg_indexer_store; mod pg_partition_manager; pub mod query; diff --git a/crates/iota-indexer/src/store/pg_indexer_analytical_store.rs b/crates/iota-indexer/src/store/pg_indexer_analytical_store.rs new file mode 100644 index 00000000000..e9fa3a4bc02 --- /dev/null +++ b/crates/iota-indexer/src/store/pg_indexer_analytical_store.rs @@ -0,0 +1,643 @@ +// Copyright (c) Mysten Labs, Inc. +// Modifications Copyright (c) 2024 IOTA Stiftung +// SPDX-License-Identifier: Apache-2.0 + +use std::time::Duration; +use tap::tap::TapFallible; +use tracing::{error, info}; + +use async_trait::async_trait; +use core::result::Result::Ok; +use diesel::dsl::count; +use diesel::{ExpressionMethods, OptionalExtension}; +use diesel::{QueryDsl, RunQueryDsl}; +use sui_types::base_types::ObjectID; + +use crate::db::PgConnectionPool; +use crate::errors::{Context, IndexerError}; +use crate::models::address_metrics::StoredAddressMetrics; +use crate::models::checkpoints::StoredCheckpoint; +use crate::models::move_call_metrics::{ + build_move_call_metric_query, QueriedMoveCallMetrics, QueriedMoveMetrics, StoredMoveCallMetrics, +}; +use crate::models::network_metrics::{StoredEpochPeakTps, Tps}; +use crate::models::transactions::{ + StoredTransaction, StoredTransactionCheckpoint, StoredTransactionSuccessCommandCount, + StoredTransactionTimestamp, TxSeq, +}; +use crate::models::tx_count_metrics::StoredTxCountMetrics; +use crate::schema::{ + active_addresses, address_metrics, addresses, checkpoints, epoch_peak_tps, move_call_metrics, + move_calls, transactions, tx_count_metrics, +}; +use crate::store::diesel_macro::{read_only_blocking, transactional_blocking_with_retry}; +use crate::types::IndexerResult; + +use super::IndexerAnalyticalStore; + +#[derive(Clone)] +pub struct PgIndexerAnalyticalStore { + blocking_cp: PgConnectionPool, +} + +impl PgIndexerAnalyticalStore { + pub fn new(blocking_cp: PgConnectionPool) -> Self { + Self { blocking_cp } + } +} + +#[async_trait] +impl IndexerAnalyticalStore for PgIndexerAnalyticalStore { + async fn get_latest_stored_checkpoint(&self) -> IndexerResult> { + let latest_cp = read_only_blocking!(&self.blocking_cp, |conn| { + checkpoints::dsl::checkpoints + .order(checkpoints::sequence_number.desc()) + .first::(conn) + .optional() + }) + .context("Failed reading latest checkpoint from PostgresDB")?; + Ok(latest_cp) + } + + async fn get_latest_stored_transaction(&self) -> IndexerResult> { + let latest_tx = read_only_blocking!(&self.blocking_cp, |conn| { + transactions::dsl::transactions + .order(transactions::tx_sequence_number.desc()) + .first::(conn) + .optional() + }) + .context("Failed reading latest transaction from PostgresDB")?; + Ok(latest_tx) + } + + async fn get_checkpoints_in_range( + &self, + start_checkpoint: i64, + end_checkpoint: i64, + ) -> IndexerResult> { + let cps = read_only_blocking!(&self.blocking_cp, |conn| { + checkpoints::dsl::checkpoints + .filter(checkpoints::sequence_number.ge(start_checkpoint)) + .filter(checkpoints::sequence_number.lt(end_checkpoint)) + .order(checkpoints::sequence_number.asc()) + .load::(conn) + }) + .context("Failed reading checkpoints from PostgresDB")?; + Ok(cps) + } + + async fn get_tx_timestamps_in_checkpoint_range( + &self, + start_checkpoint: i64, + end_checkpoint: i64, + ) -> IndexerResult> { + let tx_timestamps = read_only_blocking!(&self.blocking_cp, |conn| { + transactions::dsl::transactions + .filter(transactions::dsl::checkpoint_sequence_number.ge(start_checkpoint)) + .filter(transactions::dsl::checkpoint_sequence_number.lt(end_checkpoint)) + .order(transactions::dsl::tx_sequence_number.asc()) + .select(( + transactions::dsl::tx_sequence_number, + transactions::dsl::timestamp_ms, + )) + .load::(conn) + }) + .context("Failed reading transaction timestamps from PostgresDB")?; + Ok(tx_timestamps) + } + + async fn get_tx_checkpoints_in_checkpoint_range( + &self, + start_checkpoint: i64, + end_checkpoint: i64, + ) -> IndexerResult> { + let tx_checkpoints = read_only_blocking!(&self.blocking_cp, |conn| { + transactions::dsl::transactions + .filter(transactions::dsl::checkpoint_sequence_number.ge(start_checkpoint)) + .filter(transactions::dsl::checkpoint_sequence_number.lt(end_checkpoint)) + .order(transactions::dsl::tx_sequence_number.asc()) + .select(( + transactions::dsl::tx_sequence_number, + transactions::dsl::checkpoint_sequence_number, + )) + .load::(conn) + }) + .context("Failed reading transaction checkpoints from PostgresDB")?; + Ok(tx_checkpoints) + } + + async fn get_tx_success_cmd_counts_in_checkpoint_range( + &self, + start_checkpoint: i64, + end_checkpoint: i64, + ) -> IndexerResult> { + let tx_success_cmd_counts = read_only_blocking!(&self.blocking_cp, |conn| { + transactions::dsl::transactions + .filter(transactions::dsl::checkpoint_sequence_number.ge(start_checkpoint)) + .filter(transactions::dsl::checkpoint_sequence_number.lt(end_checkpoint)) + .order(transactions::dsl::tx_sequence_number.asc()) + .select(( + transactions::dsl::tx_sequence_number, + transactions::dsl::checkpoint_sequence_number, + transactions::dsl::success_command_count, + transactions::dsl::timestamp_ms, + )) + .load::(conn) + }) + .context("Failed reading transaction success command counts from PostgresDB")?; + Ok(tx_success_cmd_counts) + } + async fn get_tx(&self, tx_sequence_number: i64) -> IndexerResult> { + let tx = read_only_blocking!(&self.blocking_cp, |conn| { + transactions::dsl::transactions + .filter(transactions::dsl::tx_sequence_number.eq(tx_sequence_number)) + .first::(conn) + .optional() + }) + .context("Failed reading transaction from PostgresDB")?; + Ok(tx) + } + + async fn get_cp(&self, sequence_number: i64) -> IndexerResult> { + let cp = read_only_blocking!(&self.blocking_cp, |conn| { + checkpoints::dsl::checkpoints + .filter(checkpoints::dsl::sequence_number.eq(sequence_number)) + .first::(conn) + .optional() + }) + .context("Failed reading checkpoint from PostgresDB")?; + Ok(cp) + } + + async fn get_latest_tx_count_metrics(&self) -> IndexerResult> { + let latest_tx_count = read_only_blocking!(&self.blocking_cp, |conn| { + tx_count_metrics::dsl::tx_count_metrics + .order(tx_count_metrics::dsl::checkpoint_sequence_number.desc()) + .first::(conn) + .optional() + }) + .context("Failed reading latest tx count metrics from PostgresDB")?; + Ok(latest_tx_count) + } + + async fn get_latest_epoch_peak_tps(&self) -> IndexerResult> { + let latest_network_metrics = read_only_blocking!(&self.blocking_cp, |conn| { + epoch_peak_tps::dsl::epoch_peak_tps + .order(epoch_peak_tps::dsl::epoch.desc()) + .first::(conn) + .optional() + }) + .context("Failed reading latest epoch peak TPS from PostgresDB")?; + Ok(latest_network_metrics) + } + + fn persist_tx_count_metrics( + &self, + start_checkpoint: i64, + end_checkpoint: i64, + ) -> IndexerResult<()> { + let tx_count_query = construct_checkpoint_tx_count_query(start_checkpoint, end_checkpoint); + info!("Persisting tx count metrics for cp {}", start_checkpoint); + transactional_blocking_with_retry!( + &self.blocking_cp, + |conn| { + diesel::sql_query(tx_count_query.clone()).execute(conn)?; + Ok::<(), IndexerError>(()) + }, + Duration::from_secs(10) + ) + .context("Failed persisting tx count metrics to PostgresDB")?; + info!("Persisted tx count metrics for cp {}", start_checkpoint); + Ok(()) + } + + async fn persist_epoch_peak_tps(&self, epoch: i64) -> IndexerResult<()> { + let epoch_peak_tps_query = construct_peak_tps_query(epoch, 1); + let peak_tps_30d_query = construct_peak_tps_query(epoch, 30); + let epoch_tps: Tps = + read_only_blocking!(&self.blocking_cp, |conn| diesel::RunQueryDsl::get_result( + diesel::sql_query(epoch_peak_tps_query), + conn + )) + .context("Failed reading epoch peak TPS from PostgresDB")?; + let tps_30d: Tps = + read_only_blocking!(&self.blocking_cp, |conn| diesel::RunQueryDsl::get_result( + diesel::sql_query(peak_tps_30d_query), + conn + )) + .context("Failed reading 30d peak TPS from PostgresDB")?; + + let epoch_peak_tps = StoredEpochPeakTps { + epoch, + peak_tps: epoch_tps.peak_tps, + peak_tps_30d: tps_30d.peak_tps, + }; + transactional_blocking_with_retry!( + &self.blocking_cp, + |conn| { + diesel::insert_into(epoch_peak_tps::table) + .values(epoch_peak_tps.clone()) + .on_conflict_do_nothing() + .execute(conn) + }, + Duration::from_secs(10) + ) + .context("Failed persisting epoch peak TPS to PostgresDB.")?; + Ok(()) + } + + async fn get_address_metrics_last_processed_tx_seq(&self) -> IndexerResult> { + let last_processed_tx_seq = read_only_blocking!(&self.blocking_cp, |conn| { + active_addresses::dsl::active_addresses + .order(active_addresses::dsl::last_appearance_tx.desc()) + .select((active_addresses::dsl::last_appearance_tx,)) + .first::(conn) + .optional() + }) + .context("Failed to read address metrics last processed tx sequence.")?; + Ok(last_processed_tx_seq) + } + + fn persist_addresses_in_tx_range( + &self, + start_tx_seq: i64, + end_tx_seq: i64, + ) -> IndexerResult<()> { + let address_persist_query = construct_address_persisting_query(start_tx_seq, end_tx_seq); + transactional_blocking_with_retry!( + &self.blocking_cp, + |conn| { + diesel::sql_query(address_persist_query.clone()).execute(conn)?; + Ok::<(), IndexerError>(()) + }, + Duration::from_secs(10) + ) + .context("Failed persisting addresses to PostgresDB")?; + Ok(()) + } + + fn persist_active_addresses_in_tx_range( + &self, + start_tx_seq: i64, + end_tx_seq: i64, + ) -> IndexerResult<()> { + let active_address_persist_query = + construct_active_address_persisting_query(start_tx_seq, end_tx_seq); + transactional_blocking_with_retry!( + &self.blocking_cp, + |conn| { + diesel::sql_query(active_address_persist_query.clone()).execute(conn)?; + Ok::<(), IndexerError>(()) + }, + Duration::from_secs(10) + ) + .context("Failed persisting active addresses to PostgresDB")?; + Ok(()) + } + + async fn calculate_and_persist_address_metrics(&self, checkpoint: i64) -> IndexerResult<()> { + let mut checkpoint_opt = self + .get_checkpoints_in_range(checkpoint, checkpoint + 1) + .await? + .pop(); + while checkpoint_opt.is_none() { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + checkpoint_opt = self + .get_checkpoints_in_range(checkpoint, checkpoint + 1) + .await? + .pop(); + } + let checkpoint = checkpoint_opt.unwrap(); + let cp_timestamp_ms = checkpoint.timestamp_ms; + let addr_count = read_only_blocking!(&self.blocking_cp, |conn| { + addresses::dsl::addresses + .filter(addresses::first_appearance_time.le(cp_timestamp_ms)) + .count() + .get_result::(conn) + })?; + let active_addr_count = read_only_blocking!(&self.blocking_cp, |conn| { + active_addresses::dsl::active_addresses + .filter(active_addresses::first_appearance_time.le(cp_timestamp_ms)) + .count() + .get_result::(conn) + })?; + let time_one_day_ago = cp_timestamp_ms - 1000 * 60 * 60 * 24; + let daily_active_addresses = read_only_blocking!(&self.blocking_cp, |conn| { + active_addresses::dsl::active_addresses + .filter(active_addresses::first_appearance_time.le(cp_timestamp_ms)) + .filter(active_addresses::last_appearance_time.gt(time_one_day_ago)) + .select(count(active_addresses::address)) + .first(conn) + })?; + let address_metrics_to_commit = StoredAddressMetrics { + checkpoint: checkpoint.sequence_number, + epoch: checkpoint.epoch, + timestamp_ms: checkpoint.timestamp_ms, + cumulative_addresses: addr_count, + cumulative_active_addresses: active_addr_count, + daily_active_addresses, + }; + transactional_blocking_with_retry!( + &self.blocking_cp, + |conn| { + diesel::insert_into(address_metrics::table) + .values(address_metrics_to_commit.clone()) + .on_conflict_do_nothing() + .execute(conn) + }, + Duration::from_secs(60) + ) + .context("Failed persisting address metrics to PostgresDB")?; + Ok(()) + } + + async fn get_latest_move_call_tx_seq(&self) -> IndexerResult> { + let last_processed_tx_seq = read_only_blocking!(&self.blocking_cp, |conn| { + move_calls::dsl::move_calls + .order(move_calls::dsl::transaction_sequence_number.desc()) + .select((move_calls::dsl::transaction_sequence_number,)) + .first::(conn) + .optional() + }) + .unwrap_or_default(); + Ok(last_processed_tx_seq) + } + + async fn get_latest_move_call_metrics(&self) -> IndexerResult> { + let latest_move_call_metrics = read_only_blocking!(&self.blocking_cp, |conn| { + move_call_metrics::dsl::move_call_metrics + .order(move_call_metrics::epoch.desc()) + .first::(conn) + .optional() + }) + .unwrap_or_default(); + Ok(latest_move_call_metrics.map(|m| m.into())) + } + + fn persist_move_calls_in_tx_range( + &self, + start_tx_seq: i64, + end_tx_seq: i64, + ) -> IndexerResult<()> { + let move_call_persist_query = construct_move_call_persist_query(start_tx_seq, end_tx_seq); + transactional_blocking_with_retry!( + &self.blocking_cp, + |conn| { + diesel::sql_query(move_call_persist_query.clone()).execute(conn)?; + Ok::<(), IndexerError>(()) + }, + Duration::from_secs(10) + ) + .context("Failed persisting move calls to PostgresDB")?; + Ok(()) + } + + async fn calculate_and_persist_move_call_metrics(&self, epoch: i64) -> IndexerResult<()> { + let move_call_query_3d = build_move_call_metric_query(epoch, 3); + let move_call_query_7d = build_move_call_metric_query(epoch, 7); + let move_call_query_30d = build_move_call_metric_query(epoch, 30); + + let mut calculate_tasks = vec![]; + let blocking_cp_3d = self.blocking_cp.clone(); + calculate_tasks.push(tokio::task::spawn_blocking(move || { + read_only_blocking!(&blocking_cp_3d, |conn| { + diesel::sql_query(move_call_query_3d).get_results::(conn) + }) + })); + let blocking_cp_7d = self.blocking_cp.clone(); + calculate_tasks.push(tokio::task::spawn_blocking(move || { + read_only_blocking!(&blocking_cp_7d, |conn| { + diesel::sql_query(move_call_query_7d).get_results::(conn) + }) + })); + let blocking_cp_30d = self.blocking_cp.clone(); + calculate_tasks.push(tokio::task::spawn_blocking(move || { + read_only_blocking!(&blocking_cp_30d, |conn| { + diesel::sql_query(move_call_query_30d).get_results::(conn) + }) + })); + let chained = futures::future::join_all(calculate_tasks) + .await + .into_iter() + .collect::, _>>() + .tap_err(|e| { + error!("Error joining move call calculation tasks: {:?}", e); + })? + .into_iter() + .collect::, _>>() + .tap_err(|e| { + error!("Error calculating move call metrics: {:?}", e); + })? + .into_iter() + .flatten() + .collect::>(); + + let move_call_metrics: Vec = chained + .into_iter() + .filter_map(|queried_move_metrics| { + let package = ObjectID::from_bytes(queried_move_metrics.move_package.clone()).ok(); + let package_str = match package { + Some(p) => p.to_canonical_string(/* with_prefix */ true), + None => { + tracing::error!( + "Failed to parse move package ID: {:?}", + queried_move_metrics.move_package + ); + return None; + } + }; + Some(StoredMoveCallMetrics { + id: None, + epoch, + day: queried_move_metrics.day, + move_package: package_str, + move_module: queried_move_metrics.move_module, + move_function: queried_move_metrics.move_function, + count: queried_move_metrics.count, + }) + }) + .collect(); + + transactional_blocking_with_retry!( + &self.blocking_cp, + |conn| { + diesel::insert_into(move_call_metrics::table) + .values(move_call_metrics.clone()) + .on_conflict_do_nothing() + .execute(conn) + }, + Duration::from_secs(60) + ) + .context("Failed persisting move call metrics to PostgresDB")?; + Ok(()) + } +} + +fn construct_checkpoint_tx_count_query(start_checkpoint: i64, end_checkpoint: i64) -> String { + format!( + "With filtered_txns AS ( + SELECT + t.checkpoint_sequence_number, + c.epoch, + t.timestamp_ms, + t.success_command_count + FROM transactions t + LEFT JOIN checkpoints c + ON t.checkpoint_sequence_number = c.sequence_number + WHERE t.checkpoint_sequence_number >= {} AND t.checkpoint_sequence_number < {} + ) + INSERT INTO tx_count_metrics + SELECT + checkpoint_sequence_number, + epoch, + MAX(timestamp_ms) AS timestamp_ms, + COUNT(*) AS total_transaction_blocks, + SUM(CASE WHEN success_command_count > 0 THEN 1 ELSE 0 END) AS total_successful_transaction_blocks, + SUM(success_command_count) AS total_successful_transactions + FROM filtered_txns + GROUP BY checkpoint_sequence_number, epoch ORDER BY checkpoint_sequence_number + ON CONFLICT (checkpoint_sequence_number) DO NOTHING; + ", start_checkpoint, end_checkpoint + ) +} + +fn construct_peak_tps_query(epoch: i64, offset: i64) -> String { + format!( + "WITH filtered_checkpoints AS ( + SELECT + MAX(checkpoint_sequence_number) AS checkpoint_sequence_number, + SUM(total_successful_transactions) AS total_successful_transactions, + timestamp_ms + FROM + tx_count_metrics + WHERE epoch > ({} - {}) AND epoch <= {} + GROUP BY + timestamp_ms + ), + tps_data AS ( + SELECT + checkpoint_sequence_number, + total_successful_transactions, + timestamp_ms - LAG(timestamp_ms) OVER (ORDER BY timestamp_ms) AS time_diff + FROM + filtered_checkpoints + ) + SELECT + MAX(total_successful_transactions * 1000.0 / time_diff)::float8 as peak_tps + FROM + tps_data + WHERE + time_diff IS NOT NULL; + ", + epoch, offset, epoch + ) +} + +fn construct_address_persisting_query(start_tx_seq: i64, end_tx_seq: i64) -> String { + format!( + "WITH senders AS ( + SELECT + s.sender AS address, + s.tx_sequence_number, + t.timestamp_ms + FROM tx_senders s + JOIN transactions t + ON s.tx_sequence_number = t.tx_sequence_number + WHERE s.tx_sequence_number >= {} AND s.tx_sequence_number < {} + ), + recipients AS ( + SELECT + r.recipient AS address, + r.tx_sequence_number, + t.timestamp_ms + FROM tx_recipients r + JOIN transactions t + ON r.tx_sequence_number = t.tx_sequence_number + WHERE r.tx_sequence_number >= {} AND r.tx_sequence_number < {} + ), + union_address AS ( + SELECT + address, + MIN(tx_sequence_number) as first_seq, + MIN(timestamp_ms) AS first_timestamp, + MAX(tx_sequence_number) as last_seq, + MAX(timestamp_ms) AS last_timestamp + FROM recipients GROUP BY address + UNION ALL + SELECT + address, + MIN(tx_sequence_number) as first_seq, + MIN(timestamp_ms) AS first_timestamp, + MAX(tx_sequence_number) as last_seq, + MAX(timestamp_ms) AS last_timestamp + FROM senders GROUP BY address + ) + INSERT INTO addresses + SELECT + address, + MIN(first_seq) AS first_appearance_tx, + MIN(first_timestamp) AS first_appearance_time, + MAX(last_seq) AS last_appearance_tx, + MAX(last_timestamp) AS last_appearance_time + FROM union_address + GROUP BY address + ON CONFLICT (address) DO UPDATE + SET + last_appearance_tx = GREATEST(EXCLUDED.last_appearance_tx, addresses.last_appearance_tx), + last_appearance_time = GREATEST(EXCLUDED.last_appearance_time, addresses.last_appearance_time); + ", + start_tx_seq, end_tx_seq, start_tx_seq, end_tx_seq + ) +} + +fn construct_active_address_persisting_query(start_tx_seq: i64, end_tx_seq: i64) -> String { + format!( + "WITH senders AS ( + SELECT + s.sender AS address, + s.tx_sequence_number, + t.timestamp_ms + FROM tx_senders s + JOIN transactions t + ON s.tx_sequence_number = t.tx_sequence_number + WHERE s.tx_sequence_number >= {} AND s.tx_sequence_number < {} + ) + INSERT INTO active_addresses + SELECT + address, + MIN(tx_sequence_number) AS first_appearance_tx, + MIN(timestamp_ms) AS first_appearance_time, + MAX(tx_sequence_number) AS last_appearance_tx, + MAX(timestamp_ms) AS last_appearance_time + FROM senders + GROUP BY address + ON CONFLICT (address) DO UPDATE + SET + last_appearance_tx = GREATEST(EXCLUDED.last_appearance_tx, active_addresses.last_appearance_tx), + last_appearance_time = GREATEST(EXCLUDED.last_appearance_time, active_addresses.last_appearance_time); + ", + start_tx_seq, end_tx_seq + ) +} + +fn construct_move_call_persist_query(start_tx_seq: i64, end_tx_seq: i64) -> String { + format!( + "INSERT INTO move_calls + SELECT + m.tx_sequence_number AS transaction_sequence_number, + c.sequence_number AS checkpoint_sequence_number, + c.epoch AS epoch, + m.package AS move_package, + m.module AS move_module, + m.func AS move_function + FROM tx_calls m + INNER JOIN transactions t + ON m.tx_sequence_number = t.tx_sequence_number + INNER JOIN checkpoints c + ON t.checkpoint_sequence_number = c.sequence_number + WHERE m.tx_sequence_number >= {} AND m.tx_sequence_number < {} + ON CONFLICT (transaction_sequence_number, move_package, move_module, move_function) DO NOTHING; + ", + start_tx_seq, end_tx_seq + ) +} \ No newline at end of file