Skip to content

Commit

Permalink
feat(iota-json-rpc-api): Add metric processors back
Browse files Browse the repository at this point in the history
  • Loading branch information
samuel-rufi committed Aug 14, 2024
1 parent 15f157c commit 903b920
Show file tree
Hide file tree
Showing 18 changed files with 1,671 additions and 1 deletion.
124 changes: 124 additions & 0 deletions crates/iota-indexer/src/handlers/address_metrics_processor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright (c) Mysten Labs, Inc.
// Modifications Copyright (c) 2024 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0

use tap::tap::TapFallible;
use tracing::{error, info};

use crate::metrics::IndexerMetrics;
use crate::store::IndexerAnalyticalStore;
use crate::types::IndexerResult;

const ADDRESS_PROCESSOR_BATCH_SIZE: usize = 80000;
const PARALLELISM: usize = 10;

pub struct AddressMetricsProcessor<S> {
pub store: S,
metrics: IndexerMetrics,
pub address_processor_batch_size: usize,
pub address_processor_parallelism: usize,
}

impl<S> AddressMetricsProcessor<S>
where
S: IndexerAnalyticalStore + Clone + Sync + Send + 'static,
{
pub fn new(store: S, metrics: IndexerMetrics) -> AddressMetricsProcessor<S> {
let address_processor_batch_size = std::env::var("ADDRESS_PROCESSOR_BATCH_SIZE")
.map(|s| s.parse::<usize>().unwrap_or(ADDRESS_PROCESSOR_BATCH_SIZE))
.unwrap_or(ADDRESS_PROCESSOR_BATCH_SIZE);
let address_processor_parallelism = std::env::var("ADDRESS_PROCESSOR_PARALLELISM")
.map(|s| s.parse::<usize>().unwrap_or(PARALLELISM))
.unwrap_or(PARALLELISM);
Self {
store,
metrics,
address_processor_batch_size,
address_processor_parallelism,
}
}

pub async fn start(&self) -> IndexerResult<()> {
info!("Indexer address metrics async processor started...");
let latest_tx_seq = self
.store
.get_address_metrics_last_processed_tx_seq()
.await?;
let mut last_processed_tx_seq = latest_tx_seq.unwrap_or_default().seq;
loop {
let mut latest_tx = self.store.get_latest_stored_transaction().await?;
while if let Some(tx) = latest_tx {
tx.tx_sequence_number
< last_processed_tx_seq + self.address_processor_batch_size as i64
} else {
true
} {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
latest_tx = self.store.get_latest_stored_transaction().await?;
}

let mut persist_tasks = vec![];
let batch_size = self.address_processor_batch_size;
let step_size = batch_size / self.address_processor_parallelism;
for chunk_start_tx_seq in (last_processed_tx_seq + 1
..last_processed_tx_seq + batch_size as i64 + 1)
.step_by(step_size)
{
let active_address_store = self.store.clone();
persist_tasks.push(tokio::task::spawn_blocking(move || {
active_address_store.persist_active_addresses_in_tx_range(
chunk_start_tx_seq,
chunk_start_tx_seq + step_size as i64,
)
}));
}
for chunk_start_tx_seq in (last_processed_tx_seq + 1
..last_processed_tx_seq + batch_size as i64 + 1)
.step_by(step_size)
{
let address_store = self.store.clone();
persist_tasks.push(tokio::task::spawn_blocking(move || {
address_store.persist_addresses_in_tx_range(
chunk_start_tx_seq,
chunk_start_tx_seq + step_size as i64,
)
}));
}
futures::future::join_all(persist_tasks)
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()
.tap_err(|e| {
error!("Error joining address persist tasks: {:?}", e);
})?
.into_iter()
.collect::<Result<Vec<_>, _>>()
.tap_err(|e| {
error!("Error persisting addresses or active addresses: {:?}", e);
})?;
last_processed_tx_seq += self.address_processor_batch_size as i64;
info!(
"Persisted addresses and active addresses for tx seq: {}",
last_processed_tx_seq,
);
self.metrics
.latest_address_metrics_tx_seq
.set(last_processed_tx_seq);

let mut last_processed_tx = self.store.get_tx(last_processed_tx_seq).await?;
while last_processed_tx.is_none() {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
last_processed_tx = self.store.get_tx(last_processed_tx_seq).await?;
}
// unwrap is safe here b/c we just checked that it's not None
let last_processed_cp = last_processed_tx.unwrap().checkpoint_sequence_number;
self.store
.calculate_and_persist_address_metrics(last_processed_cp)
.await?;
info!(
"Persisted address metrics for checkpoint: {}",
last_processed_cp
);
}
}
}
4 changes: 4 additions & 0 deletions crates/iota-indexer/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ pub mod checkpoint_handler;
pub mod committer;
pub mod objects_snapshot_processor;
pub mod tx_processor;
pub mod address_metrics_processor;
pub mod move_call_metrics_processor;
pub mod network_metrics_processor;
pub mod processor_orchestrator;

#[derive(Debug)]
pub struct CheckpointDataToCommit {
Expand Down
113 changes: 113 additions & 0 deletions crates/iota-indexer/src/handlers/move_call_metrics_processor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright (c) Mysten Labs, Inc.
// Modifications Copyright (c) 2024 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0

use tap::tap::TapFallible;
use tracing::{error, info};

use crate::metrics::IndexerMetrics;
use crate::store::IndexerAnalyticalStore;
use crate::types::IndexerResult;

const MOVE_CALL_PROCESSOR_BATCH_SIZE: usize = 80000;
const PARALLELISM: usize = 10;

pub struct MoveCallMetricsProcessor<S> {
pub store: S,
metrics: IndexerMetrics,
pub move_call_processor_batch_size: usize,
pub move_call_processor_parallelism: usize,
}

impl<S> MoveCallMetricsProcessor<S>
where
S: IndexerAnalyticalStore + Clone + Sync + Send + 'static,
{
pub fn new(store: S, metrics: IndexerMetrics) -> MoveCallMetricsProcessor<S> {
let move_call_processor_batch_size = std::env::var("MOVE_CALL_PROCESSOR_BATCH_SIZE")
.map(|s| s.parse::<usize>().unwrap_or(MOVE_CALL_PROCESSOR_BATCH_SIZE))
.unwrap_or(MOVE_CALL_PROCESSOR_BATCH_SIZE);
let move_call_processor_parallelism = std::env::var("MOVE_CALL_PROCESSOR_PARALLELISM")
.map(|s| s.parse::<usize>().unwrap_or(PARALLELISM))
.unwrap_or(PARALLELISM);
Self {
store,
metrics,
move_call_processor_batch_size,
move_call_processor_parallelism,
}
}

pub async fn start(&self) -> IndexerResult<()> {
info!("Indexer move call metrics async processor started...");
let latest_move_call_tx_seq = self.store.get_latest_move_call_tx_seq().await?;
let mut last_processed_tx_seq = latest_move_call_tx_seq.unwrap_or_default().seq;
let latest_move_call_epoch = self.store.get_latest_move_call_metrics().await?;
let mut last_processed_epoch = latest_move_call_epoch.unwrap_or_default().epoch;
loop {
let mut latest_tx = self.store.get_latest_stored_transaction().await?;
while if let Some(tx) = latest_tx {
tx.tx_sequence_number
< last_processed_tx_seq + self.move_call_processor_batch_size as i64
} else {
true
} {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
latest_tx = self.store.get_latest_stored_transaction().await?;
}

let batch_size = self.move_call_processor_batch_size;
let step_size = batch_size / self.move_call_processor_parallelism;
let mut persist_tasks = vec![];
for chunk_start_tx_seq in (last_processed_tx_seq + 1
..last_processed_tx_seq + batch_size as i64 + 1)
.step_by(step_size)
{
let move_call_store = self.store.clone();
persist_tasks.push(tokio::task::spawn_blocking(move || {
move_call_store.persist_move_calls_in_tx_range(
chunk_start_tx_seq,
chunk_start_tx_seq + step_size as i64,
)
}));
}
futures::future::join_all(persist_tasks)
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()
.tap_err(|e| {
error!("Error joining move call persist tasks: {:?}", e);
})?
.into_iter()
.collect::<Result<Vec<_>, _>>()
.tap_err(|e| {
error!("Error persisting move calls: {:?}", e);
})?;
last_processed_tx_seq += batch_size as i64;
info!("Persisted move_calls at tx seq: {}", last_processed_tx_seq);
self.metrics
.latest_move_call_metrics_tx_seq
.set(last_processed_tx_seq);

let mut tx = self.store.get_tx(last_processed_tx_seq).await?;
while tx.is_none() {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
tx = self.store.get_tx(last_processed_tx_seq).await?;
}
let cp_seq = tx.unwrap().checkpoint_sequence_number;
let mut cp = self.store.get_cp(cp_seq).await?;
while cp.is_none() {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
cp = self.store.get_cp(cp_seq).await?;
}
let end_epoch = cp.unwrap().epoch;
for epoch in last_processed_epoch + 1..end_epoch {
self.store
.calculate_and_persist_move_call_metrics(epoch)
.await?;
info!("Persisted move_call_metrics for epoch: {}", epoch);
}
last_processed_epoch = end_epoch - 1;
}
}
}
129 changes: 129 additions & 0 deletions crates/iota-indexer/src/handlers/network_metrics_processor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Copyright (c) Mysten Labs, Inc.
// Modifications Copyright (c) 2024 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0

use tap::tap::TapFallible;
use tracing::{error, info};

use crate::errors::IndexerError;
use crate::metrics::IndexerMetrics;
use crate::store::IndexerAnalyticalStore;
use crate::types::IndexerResult;

const NETWORK_METRICS_PROCESSOR_BATCH_SIZE: usize = 10;
const PARALLELISM: usize = 1;

pub struct NetworkMetricsProcessor<S> {
pub store: S,
metrics: IndexerMetrics,
pub network_processor_metrics_batch_size: usize,
pub network_processor_metrics_parallelism: usize,
}

impl<S> NetworkMetricsProcessor<S>
where
S: IndexerAnalyticalStore + Clone + Sync + Send + 'static,
{
pub fn new(store: S, metrics: IndexerMetrics) -> NetworkMetricsProcessor<S> {
let network_processor_metrics_batch_size =
std::env::var("NETWORK_PROCESSOR_METRICS_BATCH_SIZE")
.map(|s| {
s.parse::<usize>()
.unwrap_or(NETWORK_METRICS_PROCESSOR_BATCH_SIZE)
})
.unwrap_or(NETWORK_METRICS_PROCESSOR_BATCH_SIZE);
let network_processor_metrics_parallelism =
std::env::var("NETWORK_PROCESSOR_METRICS_PARALLELISM")
.map(|s| s.parse::<usize>().unwrap_or(PARALLELISM))
.unwrap_or(PARALLELISM);
Self {
store,
metrics,
network_processor_metrics_batch_size,
network_processor_metrics_parallelism,
}
}

pub async fn start(&self) -> IndexerResult<()> {
info!("Indexer network metrics async processor started...");
let latest_tx_count_metrics = self
.store
.get_latest_tx_count_metrics()
.await
.unwrap_or_default();
let latest_epoch_peak_tps = self
.store
.get_latest_epoch_peak_tps()
.await
.unwrap_or_default();
let mut last_processed_cp_seq = latest_tx_count_metrics
.unwrap_or_default()
.checkpoint_sequence_number;
let mut last_processed_peak_tps_epoch = latest_epoch_peak_tps.unwrap_or_default().epoch;
loop {
let mut latest_stored_checkpoint = self.store.get_latest_stored_checkpoint().await?;
while if let Some(cp) = latest_stored_checkpoint {
cp.sequence_number
< last_processed_cp_seq + self.network_processor_metrics_batch_size as i64
} else {
true
} {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
latest_stored_checkpoint = self.store.get_latest_stored_checkpoint().await?;
}

info!(
"Persisting tx count metrics for checkpoint sequence number {}",
last_processed_cp_seq
);
let batch_size = self.network_processor_metrics_batch_size;
let step_size = batch_size / self.network_processor_metrics_parallelism;
let mut persist_tasks = vec![];
for chunk_start_cp in (last_processed_cp_seq + 1
..last_processed_cp_seq + batch_size as i64 + 1)
.step_by(step_size)
{
let store = self.store.clone();
persist_tasks.push(tokio::task::spawn_blocking(move || {
store
.persist_tx_count_metrics(chunk_start_cp, chunk_start_cp + step_size as i64)
}));
}
futures::future::join_all(persist_tasks)
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()
.tap_err(|e| {
error!("Error joining network persist tasks: {:?}", e);
})?
.into_iter()
.collect::<Result<Vec<_>, _>>()
.tap_err(|e| {
error!("Error persisting tx count metrics: {:?}", e);
})?;
last_processed_cp_seq += batch_size as i64;
info!(
"Persisted tx count metrics for checkpoint sequence number {}",
last_processed_cp_seq
);
self.metrics
.latest_network_metrics_cp_seq
.set(last_processed_cp_seq);

let end_cp = self
.store
.get_checkpoints_in_range(last_processed_cp_seq, last_processed_cp_seq + 1)
.await?
.first()
.ok_or(IndexerError::PostgresReadError(
"Cannot read checkpoint from PG for epoch peak TPS".to_string(),
))?
.clone();
for epoch in last_processed_peak_tps_epoch + 1..end_cp.epoch {
self.store.persist_epoch_peak_tps(epoch).await?;
last_processed_peak_tps_epoch = epoch;
info!("Persisted epoch peak TPS for epoch {}", epoch);
}
}
}
}
Loading

0 comments on commit 903b920

Please sign in to comment.