Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(iota-json-rpc-api): Restore explorer metric endpoints #1845

Merged
merged 16 commits into from
Aug 21, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/iota-graphql-rpc/src/test_infra/cluster.rs
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@
use std::{net::SocketAddr, sync::Arc, time::Duration};

use iota_graphql_rpc_client::simple_client::SimpleClient;
pub use iota_indexer::handlers::objects_snapshot_processor::SnapshotLagConfig;
pub use iota_indexer::processors::objects_snapshot_processor::SnapshotLagConfig;
use iota_indexer::{
errors::IndexerError,
store::{indexer_store::IndexerStore, PgIndexerStore},
86 changes: 85 additions & 1 deletion crates/iota-indexer/src/apis/extended_api.rs
Original file line number Diff line number Diff line change
@@ -7,7 +7,8 @@ use iota_json_rpc_api::{
internal_error, validate_limit, ExtendedApiServer, QUERY_MAX_RESULT_LIMIT_CHECKPOINTS,
};
use iota_json_rpc_types::{
CheckpointedObjectID, EpochInfo, EpochPage, IotaObjectResponseQuery, Page, QueryObjectsPage,
AddressMetrics, CheckpointedObjectID, EpochInfo, EpochMetrics, EpochMetricsPage, EpochPage,
IotaObjectResponseQuery, MoveCallMetrics, NetworkMetrics, Page, QueryObjectsPage,
};
use iota_open_rpc::Module;
use iota_types::iota_serde::BigInt;
@@ -56,6 +57,46 @@ impl ExtendedApiServer for ExtendedApi {
})
}

async fn get_epoch_metrics(
&self,
cursor: Option<BigInt<u64>>,
limit: Option<usize>,
descending_order: Option<bool>,
) -> RpcResult<EpochMetricsPage> {
let limit =
validate_limit(limit, QUERY_MAX_RESULT_LIMIT_CHECKPOINTS).map_err(internal_error)?;
let epochs = self
.inner
.spawn_blocking(move |this| {
this.get_epochs(
cursor.map(|x| *x),
limit + 1,
descending_order.unwrap_or(false),
)
})
.await?;

let mut epoch_metrics = epochs
.into_iter()
.map(|e| EpochMetrics {
epoch: e.epoch,
epoch_total_transactions: e.epoch_total_transactions,
first_checkpoint_id: e.first_checkpoint_id,
epoch_start_timestamp: e.epoch_start_timestamp,
end_of_epoch_info: e.end_of_epoch_info,
})
.collect::<Vec<_>>();

let has_next_page = epoch_metrics.len() > limit;
epoch_metrics.truncate(limit);
let next_cursor = epoch_metrics.last().map(|e| e.epoch);
Ok(Page {
data: epoch_metrics,
next_cursor: next_cursor.map(|id| id.into()),
has_next_page,
})
}

async fn get_current_epoch(&self) -> RpcResult<EpochInfo> {
let stored_epoch = self
.inner
@@ -64,6 +105,49 @@ impl ExtendedApiServer for ExtendedApi {
EpochInfo::try_from(stored_epoch).map_err(Into::into)
}

async fn get_network_metrics(&self) -> RpcResult<NetworkMetrics> {
let network_metrics = self
.inner
.spawn_blocking(|this| this.get_latest_network_metrics())
.await?;
Ok(network_metrics)
}

async fn get_move_call_metrics(&self) -> RpcResult<MoveCallMetrics> {
let move_call_metrics = self
.inner
.spawn_blocking(|this| this.get_latest_move_call_metrics())
.await?;
Ok(move_call_metrics)
}

async fn get_latest_address_metrics(&self) -> RpcResult<AddressMetrics> {
let latest_address_metrics = self
.inner
.spawn_blocking(|this| this.get_latest_address_metrics())
.await?;
Ok(latest_address_metrics)
}

async fn get_checkpoint_address_metrics(&self, checkpoint: u64) -> RpcResult<AddressMetrics> {
let checkpoint_address_metrics = self
.inner
.spawn_blocking(move |this| this.get_checkpoint_address_metrics(checkpoint))
.await?;
Ok(checkpoint_address_metrics)
}

async fn get_all_epoch_address_metrics(
&self,
descending_order: Option<bool>,
) -> RpcResult<Vec<AddressMetrics>> {
let all_epoch_address_metrics = self
.inner
.spawn_blocking(move |this| this.get_all_epoch_address_metrics(descending_order))
.await?;
Ok(all_epoch_address_metrics)
}

async fn query_objects(
&self,
_query: IotaObjectResponseQuery,
1 change: 0 additions & 1 deletion crates/iota-indexer/src/handlers/mod.rs
Original file line number Diff line number Diff line change
@@ -14,7 +14,6 @@ use crate::{

pub mod checkpoint_handler;
pub mod committer;
pub mod objects_snapshot_processor;
pub mod tx_processor;

#[derive(Debug)]
24 changes: 19 additions & 5 deletions crates/iota-indexer/src/indexer.rs
Original file line number Diff line number Diff line change
@@ -13,13 +13,14 @@ use crate::{
build_json_rpc_server,
errors::IndexerError,
framework::fetcher::CheckpointFetcher,
handlers::{
checkpoint_handler::new_handlers,
objects_snapshot_processor::{ObjectsSnapshotProcessor, SnapshotLagConfig},
},
handlers::checkpoint_handler::new_handlers,
indexer_reader::IndexerReader,
metrics::IndexerMetrics,
store::IndexerStore,
processors::{
objects_snapshot_processor::{ObjectsSnapshotProcessor, SnapshotLagConfig},
processor_orchestrator::ProcessorOrchestrator,
},
store::{IndexerStore, PgIndexerAnalyticalStore},
IndexerConfig,
};

@@ -115,4 +116,17 @@ impl Indexer {

Ok(())
}

pub async fn start_analytical_worker(
store: PgIndexerAnalyticalStore,
metrics: IndexerMetrics,
) -> Result<(), IndexerError> {
info!(
"Iota Indexer Analytical Worker (version {:?}) started...",
env!("CARGO_PKG_VERSION")
);
let mut processor_orchestrator = ProcessorOrchestrator::new(store, metrics);
processor_orchestrator.run_forever().await;
Ok(())
}
}
126 changes: 122 additions & 4 deletions crates/iota-indexer/src/indexer_reader.rs
Original file line number Diff line number Diff line change
@@ -15,9 +15,10 @@ use diesel::{
};
use fastcrypto::encoding::{Encoding, Hex};
use iota_json_rpc_types::{
Balance, CheckpointId, Coin as IotaCoin, DisplayFieldsResponse, EpochInfo, EventFilter,
IotaCoinMetadata, IotaEvent, IotaObjectDataFilter, IotaTransactionBlockEffects,
IotaTransactionBlockEffectsAPI, IotaTransactionBlockResponse, TransactionFilter,
AddressMetrics, Balance, CheckpointId, Coin as IotaCoin, DisplayFieldsResponse, EpochInfo,
EventFilter, IotaCoinMetadata, IotaEvent, IotaObjectDataFilter, IotaTransactionBlockEffects,
IotaTransactionBlockEffectsAPI, IotaTransactionBlockResponse, MoveCallMetrics,
MoveFunctionName, NetworkMetrics, TransactionFilter,
};
use iota_types::{
balance::Supply,
@@ -39,17 +40,21 @@ use crate::{
db::{PgConnectionConfig, PgConnectionPoolConfig, PgPoolConnection},
errors::IndexerError,
models::{
address_metrics::StoredAddressMetrics,
checkpoints::StoredCheckpoint,
display::StoredDisplay,
epoch::StoredEpochInfo,
events::StoredEvent,
move_call_metrics::QueriedMoveCallMetrics,
network_metrics::StoredNetworkMetrics,
objects::{CoinBalance, ObjectRefColumn, StoredObject},
packages::StoredPackage,
transactions::StoredTransaction,
tx_indices::TxSequenceNumber,
},
schema::{
checkpoints, display, epochs, events, objects, objects_snapshot, packages, transactions,
address_metrics, checkpoints, display, epochs, events, move_call_metrics, objects,
objects_snapshot, packages, transactions,
},
types::{IndexerResult, OwnerType},
};
@@ -1510,6 +1515,119 @@ impl IndexerReader {
.collect::<IndexerResult<Vec<_>>>()
}

pub fn get_latest_network_metrics(&self) -> IndexerResult<NetworkMetrics> {
let metrics = self.run_query(|conn| {
diesel::sql_query("SELECT * FROM network_metrics;")
.get_result::<StoredNetworkMetrics>(conn)
})?;
Ok(metrics.into())
}

pub fn get_latest_move_call_metrics(&self) -> IndexerResult<MoveCallMetrics> {
let latest_3d_move_call_metrics = self.run_query(|conn| {
move_call_metrics::table
.filter(move_call_metrics::dsl::day.eq(3))
.order(move_call_metrics::dsl::id.desc())
.limit(10)
.load::<QueriedMoveCallMetrics>(conn)
})?;
let latest_7d_move_call_metrics = self.run_query(|conn| {
move_call_metrics::table
.filter(move_call_metrics::dsl::day.eq(7))
.order(move_call_metrics::dsl::id.desc())
.limit(10)
.load::<QueriedMoveCallMetrics>(conn)
})?;
let latest_30d_move_call_metrics = self.run_query(|conn| {
move_call_metrics::table
.filter(move_call_metrics::dsl::day.eq(30))
.order(move_call_metrics::dsl::id.desc())
.limit(10)
.load::<QueriedMoveCallMetrics>(conn)
})?;

let latest_3_days: Vec<(MoveFunctionName, usize)> = latest_3d_move_call_metrics
.into_iter()
.map(|m| m.try_into())
.collect::<Result<Vec<_>, _>>()?;
let latest_7_days: Vec<(MoveFunctionName, usize)> = latest_7d_move_call_metrics
.into_iter()
.map(|m| m.try_into())
.collect::<Result<Vec<_>, _>>()?;
let latest_30_days: Vec<(MoveFunctionName, usize)> = latest_30d_move_call_metrics
.into_iter()
.map(|m| m.try_into())
.collect::<Result<Vec<_>, _>>()?;
// sort by call count desc.
let rank_3_days = latest_3_days
.into_iter()
.sorted_by(|a, b| b.1.cmp(&a.1))
.collect::<Vec<_>>();
let rank_7_days = latest_7_days
.into_iter()
.sorted_by(|a, b| b.1.cmp(&a.1))
.collect::<Vec<_>>();
let rank_30_days = latest_30_days
.into_iter()
.sorted_by(|a, b| b.1.cmp(&a.1))
.collect::<Vec<_>>();
Ok(MoveCallMetrics {
rank_3_days,
rank_7_days,
rank_30_days,
})
}

pub fn get_latest_address_metrics(&self) -> IndexerResult<AddressMetrics> {
let stored_address_metrics = self.run_query(|conn| {
address_metrics::table
.order(address_metrics::dsl::checkpoint.desc())
.first::<StoredAddressMetrics>(conn)
})?;
Ok(stored_address_metrics.into())
}

pub fn get_checkpoint_address_metrics(
&self,
checkpoint_seq: u64,
) -> IndexerResult<AddressMetrics> {
let stored_address_metrics = self.run_query(|conn| {
address_metrics::table
.filter(address_metrics::dsl::checkpoint.eq(checkpoint_seq as i64))
.first::<StoredAddressMetrics>(conn)
})?;
Ok(stored_address_metrics.into())
}

pub fn get_all_epoch_address_metrics(
&self,
descending_order: Option<bool>,
) -> IndexerResult<Vec<AddressMetrics>> {
let is_descending = descending_order.unwrap_or_default();
let epoch_address_metrics_query = format!(
"WITH ranked_rows AS (
SELECT
checkpoint, epoch, timestamp_ms, cumulative_addresses, cumulative_active_addresses, daily_active_addresses,
row_number() OVER(PARTITION BY epoch ORDER BY checkpoint DESC) as row_num
FROM
address_metrics
)
SELECT
checkpoint, epoch, timestamp_ms, cumulative_addresses, cumulative_active_addresses, daily_active_addresses
FROM ranked_rows
WHERE row_num = 1 ORDER BY epoch {}",
if is_descending { "DESC" } else { "ASC" },
);
let epoch_address_metrics = self.run_query(|conn| {
diesel::sql_query(epoch_address_metrics_query).load::<StoredAddressMetrics>(conn)
})?;

Ok(epoch_address_metrics
.into_iter()
.map(|stored_address_metrics| stored_address_metrics.into())
.collect())
}

pub(crate) async fn get_display_fields(
&self,
original_object: &iota_types::object::Object,
4 changes: 4 additions & 0 deletions crates/iota-indexer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -34,6 +34,7 @@ pub mod indexer;
pub mod indexer_reader;
pub mod metrics;
pub mod models;
pub mod processors;
pub mod schema;
pub mod store;
pub mod test_utils;
@@ -74,6 +75,8 @@ pub struct IndexerConfig {
pub fullnode_sync_worker: bool,
#[clap(long)]
pub rpc_server_worker: bool,
#[clap(long)]
pub analytical_worker: bool,
}

impl IndexerConfig {
@@ -136,6 +139,7 @@ impl Default for IndexerConfig {
reset_db: false,
fullnode_sync_worker: true,
rpc_server_worker: true,
analytical_worker: false,
}
}
}
5 changes: 4 additions & 1 deletion crates/iota-indexer/src/main.rs
Original file line number Diff line number Diff line change
@@ -8,7 +8,7 @@ use iota_indexer::{
errors::IndexerError,
indexer::Indexer,
metrics::{start_prometheus_server, IndexerMetrics},
store::PgIndexerStore,
store::{PgIndexerAnalyticalStore, PgIndexerStore},
IndexerConfig,
};
use tracing::{error, info};
@@ -92,6 +92,9 @@ async fn main() -> Result<(), IndexerError> {
return Indexer::start_writer(&indexer_config, store, indexer_metrics).await;
} else if indexer_config.rpc_server_worker {
return Indexer::start_reader(&indexer_config, &registry, db_url).await;
} else if indexer_config.analytical_worker {
let store = PgIndexerAnalyticalStore::new(blocking_cp);
return Indexer::start_analytical_worker(store, indexer_metrics.clone()).await;
}
Ok(())
}
21 changes: 20 additions & 1 deletion crates/iota-indexer/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -91,7 +91,11 @@ pub struct IndexerMetrics {
pub latest_tx_checkpoint_sequence_number: IntGauge,
pub latest_indexer_object_checkpoint_sequence_number: IntGauge,
pub latest_object_snapshot_sequence_number: IntGauge,
// checkpoint E2E latency is:
// Analytical
pub latest_move_call_metrics_tx_seq: IntGauge,
pub latest_address_metrics_tx_seq: IntGauge,
pub latest_network_metrics_cp_seq: IntGauge,
// Checkpoint E2E latency is:
// fullnode_download_latency + checkpoint_index_latency + db_commit_latency
pub checkpoint_download_bytes_size: IntGauge,
pub fullnode_checkpoint_data_download_latency: Histogram,
@@ -241,6 +245,21 @@ impl IndexerMetrics {
"Latest object snapshot sequence number from the Indexer",
registry,
).unwrap(),
latest_move_call_metrics_tx_seq: register_int_gauge_with_registry!(
"latest_move_call_metrics_tx_seq",
"Latest move call metrics tx seq",
registry,
).unwrap(),
latest_address_metrics_tx_seq: register_int_gauge_with_registry!(
"latest_address_metrics_tx_seq",
"Latest address metrics tx seq",
registry,
).unwrap(),
latest_network_metrics_cp_seq: register_int_gauge_with_registry!(
"latest_network_metrics_cp_seq",
"Latest network metrics cp seq",
registry,
).unwrap(),
checkpoint_download_bytes_size: register_int_gauge_with_registry!(
"checkpoint_download_bytes_size",
"Size of the downloaded checkpoint in bytes",
Loading