Skip to content

Commit

Permalink
feat(iota-json-rpc-api): Merge explorer metrics feature branch (#2259)
Browse files Browse the repository at this point in the history
* feat(iota-json-rpc-api): Restore explorer metric endpoints (#1845)

* feat(iota-json-rpc-api): Add explorer endpoints back

* feat(iota-json-rpc-api): Add metric processors back

* refactor(iota-indexer): fix schema usages

* refactor(iota-indexer): redo `ExtendedApi` methods

* refactor(iota-indexer): Introduce analytical-worker in docker-compose

* refactor(iota-benchmark, iota-core): Fix format

* refactor: Adjust docker-compose params and ports

* refactor: Add missing documentation

* refactor: Explicit export

Co-authored-by: Konstantinos Demartinos <[email protected]>

* refactor: Move processors

* refactor: Regenerate OpenRPC spec

* refactor: Add license

* refactor: Fix import

* refactor: Remove outdated comment

* refactor: Add comments to clarify `address` vs `active_address` meaning

---------

Co-authored-by: Konstantinos Demartinos <[email protected]>

* fix: Remove unimplemented ExtendedApi::query_objects

* fix: fmt

* feat(tooling-sdk): Sync TS-SDK APIs (#2250)

* fix(tooling-sdk): Run generate scripts

* fix(tooling-sdk): Update manual types

* fix(tooling-sdk): Regenerate types again (after rebase)

* fix(tooling-sdk): Regenerate types again (after base branch fixes)

* fix(tooling-explorer): Remove usage of removed type (name service) and mock the client methods for now

* fix(tooling-sdk): Ignore unused arguments in client without ts-ignore

* fix(tooling-sdk): Fix build issues triggered by type changes

* fix(tooling-sdk): Support newly added methods in client.ts

* fix(tooling-sdk): Run codegen in graphql-transport

* fix(tooling-sdk): Add graphql compatibility tests for new methods (skip them as they don't pass atm)

* fix(tooling-sdk): Remove nameservice related methods from graphql-transport

* fix(tooling-sdk): Add changeset for sdk & graphql-transport packages

* fix(tooling-sdk): Make dprint like the changeset format

* fix(tooling-sdk): Make dprint like the changeset format

* feat: fmt

---------

Co-authored-by: Konstantinos Demartinos <[email protected]>
Co-authored-by: Mario <[email protected]>
  • Loading branch information
3 people authored Sep 9, 2024
1 parent dfdeda1 commit d423314
Show file tree
Hide file tree
Showing 39 changed files with 2,883 additions and 783 deletions.
9 changes: 9 additions & 0 deletions .changeset/late-buses-know.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
'@iota/graphql-transport': patch
'@iota/iota-sdk': patch
---

Sync API changes:

- restore extended api metrics endpoints
- remove nameservice endpoints
4 changes: 2 additions & 2 deletions apps/explorer/src/components/AddressesCardGraph.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ export function AddressesCardGraph(): JSX.Element {
data={adjEpochAddressMetrics}
height={height}
width={width}
getX={({ epoch }) => epoch}
getY={(data) => data[GRAPH_DATA_FIELD]}
getX={({ epoch }) => Number(epoch) || 0}
getY={(data) => Number(data[GRAPH_DATA_FIELD]) || 0}
formatY={formatAmount}
tooltipContent={TooltipContent}
/>
Expand Down
2 changes: 1 addition & 1 deletion apps/explorer/src/lib/utils/getStorageFundFlow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ interface StorageFundFlow {
fundOutflow: bigint | null;
}

export function getEpochStorageFundFlow(endOfEpochInfo: EndOfEpochInfo | null): StorageFundFlow {
export function getEpochStorageFundFlow(endOfEpochInfo?: EndOfEpochInfo | null): StorageFundFlow {
const fundInflow = endOfEpochInfo ? BigInt(endOfEpochInfo.storageCharge) : null;

const fundOutflow = endOfEpochInfo ? BigInt(endOfEpochInfo.storageRebate) : null;
Expand Down
4 changes: 3 additions & 1 deletion apps/explorer/src/lib/utils/getSupplyChangeAfterEpochEnd.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

import { type EndOfEpochInfo } from '@iota/iota-sdk/src/client';

export function getSupplyChangeAfterEpochEnd(endOfEpochInfo: EndOfEpochInfo | null): bigint | null {
export function getSupplyChangeAfterEpochEnd(
endOfEpochInfo?: EndOfEpochInfo | null,
): bigint | null {
if (endOfEpochInfo?.mintedTokensAmount == null || endOfEpochInfo?.burntTokensAmount == null)
return null;

Expand Down
2 changes: 1 addition & 1 deletion crates/iota-graphql-rpc/src/test_infra/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use std::{net::SocketAddr, sync::Arc, time::Duration};

use iota_graphql_rpc_client::simple_client::SimpleClient;
pub use iota_indexer::handlers::objects_snapshot_processor::SnapshotLagConfig;
pub use iota_indexer::processors::objects_snapshot_processor::SnapshotLagConfig;
use iota_indexer::{
errors::IndexerError,
store::{indexer_store::IndexerStore, PgIndexerStore},
Expand Down
89 changes: 82 additions & 7 deletions crates/iota-indexer/src/apis/extended_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use iota_json_rpc_api::{
internal_error, validate_limit, ExtendedApiServer, QUERY_MAX_RESULT_LIMIT_CHECKPOINTS,
};
use iota_json_rpc_types::{
CheckpointedObjectID, EpochInfo, EpochPage, IotaObjectResponseQuery, Page, QueryObjectsPage,
AddressMetrics, EpochInfo, EpochMetrics, EpochMetricsPage, EpochPage, MoveCallMetrics,
NetworkMetrics, Page,
};
use iota_open_rpc::Module;
use iota_types::iota_serde::BigInt;
Expand Down Expand Up @@ -56,6 +57,46 @@ impl ExtendedApiServer for ExtendedApi {
})
}

async fn get_epoch_metrics(
&self,
cursor: Option<BigInt<u64>>,
limit: Option<usize>,
descending_order: Option<bool>,
) -> RpcResult<EpochMetricsPage> {
let limit =
validate_limit(limit, QUERY_MAX_RESULT_LIMIT_CHECKPOINTS).map_err(internal_error)?;
let epochs = self
.inner
.spawn_blocking(move |this| {
this.get_epochs(
cursor.map(|x| *x),
limit + 1,
descending_order.unwrap_or(false),
)
})
.await?;

let mut epoch_metrics = epochs
.into_iter()
.map(|e| EpochMetrics {
epoch: e.epoch,
epoch_total_transactions: e.epoch_total_transactions,
first_checkpoint_id: e.first_checkpoint_id,
epoch_start_timestamp: e.epoch_start_timestamp,
end_of_epoch_info: e.end_of_epoch_info,
})
.collect::<Vec<_>>();

let has_next_page = epoch_metrics.len() > limit;
epoch_metrics.truncate(limit);
let next_cursor = epoch_metrics.last().map(|e| e.epoch);
Ok(Page {
data: epoch_metrics,
next_cursor: next_cursor.map(|id| id.into()),
has_next_page,
})
}

async fn get_current_epoch(&self) -> RpcResult<EpochInfo> {
let stored_epoch = self
.inner
Expand All @@ -64,13 +105,47 @@ impl ExtendedApiServer for ExtendedApi {
EpochInfo::try_from(stored_epoch).map_err(Into::into)
}

async fn query_objects(
async fn get_network_metrics(&self) -> RpcResult<NetworkMetrics> {
let network_metrics = self
.inner
.spawn_blocking(|this| this.get_latest_network_metrics())
.await?;
Ok(network_metrics)
}

async fn get_move_call_metrics(&self) -> RpcResult<MoveCallMetrics> {
let move_call_metrics = self
.inner
.spawn_blocking(|this| this.get_latest_move_call_metrics())
.await?;
Ok(move_call_metrics)
}

async fn get_latest_address_metrics(&self) -> RpcResult<AddressMetrics> {
let latest_address_metrics = self
.inner
.spawn_blocking(|this| this.get_latest_address_metrics())
.await?;
Ok(latest_address_metrics)
}

async fn get_checkpoint_address_metrics(&self, checkpoint: u64) -> RpcResult<AddressMetrics> {
let checkpoint_address_metrics = self
.inner
.spawn_blocking(move |this| this.get_checkpoint_address_metrics(checkpoint))
.await?;
Ok(checkpoint_address_metrics)
}

async fn get_all_epoch_address_metrics(
&self,
_query: IotaObjectResponseQuery,
_cursor: Option<CheckpointedObjectID>,
_limit: Option<usize>,
) -> RpcResult<QueryObjectsPage> {
Err(jsonrpsee::types::error::ErrorCode::MethodNotFound.into())
descending_order: Option<bool>,
) -> RpcResult<Vec<AddressMetrics>> {
let all_epoch_address_metrics = self
.inner
.spawn_blocking(move |this| this.get_all_epoch_address_metrics(descending_order))
.await?;
Ok(all_epoch_address_metrics)
}

async fn get_total_transactions(&self) -> RpcResult<BigInt<u64>> {
Expand Down
1 change: 0 additions & 1 deletion crates/iota-indexer/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use crate::{

pub mod checkpoint_handler;
pub mod committer;
pub mod objects_snapshot_processor;
pub mod tx_processor;

#[derive(Debug)]
Expand Down
24 changes: 19 additions & 5 deletions crates/iota-indexer/src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@ use crate::{
build_json_rpc_server,
errors::IndexerError,
framework::fetcher::CheckpointFetcher,
handlers::{
checkpoint_handler::new_handlers,
objects_snapshot_processor::{ObjectsSnapshotProcessor, SnapshotLagConfig},
},
handlers::checkpoint_handler::new_handlers,
indexer_reader::IndexerReader,
metrics::IndexerMetrics,
store::IndexerStore,
processors::{
objects_snapshot_processor::{ObjectsSnapshotProcessor, SnapshotLagConfig},
processor_orchestrator::ProcessorOrchestrator,
},
store::{IndexerStore, PgIndexerAnalyticalStore},
IndexerConfig,
};

Expand Down Expand Up @@ -111,4 +112,17 @@ impl Indexer {

Ok(())
}

pub async fn start_analytical_worker(
store: PgIndexerAnalyticalStore,
metrics: IndexerMetrics,
) -> Result<(), IndexerError> {
info!(
"Iota Indexer Analytical Worker (version {:?}) started...",
env!("CARGO_PKG_VERSION")
);
let mut processor_orchestrator = ProcessorOrchestrator::new(store, metrics);
processor_orchestrator.run_forever().await;
Ok(())
}
}
126 changes: 122 additions & 4 deletions crates/iota-indexer/src/indexer_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ use diesel::{
};
use fastcrypto::encoding::{Encoding, Hex};
use iota_json_rpc_types::{
Balance, CheckpointId, Coin as IotaCoin, DisplayFieldsResponse, EpochInfo, EventFilter,
IotaCoinMetadata, IotaEvent, IotaObjectDataFilter, IotaTransactionBlockEffects,
IotaTransactionBlockEffectsAPI, IotaTransactionBlockResponse, TransactionFilter,
AddressMetrics, Balance, CheckpointId, Coin as IotaCoin, DisplayFieldsResponse, EpochInfo,
EventFilter, IotaCoinMetadata, IotaEvent, IotaObjectDataFilter, IotaTransactionBlockEffects,
IotaTransactionBlockEffectsAPI, IotaTransactionBlockResponse, MoveCallMetrics,
MoveFunctionName, NetworkMetrics, TransactionFilter,
};
use iota_types::{
balance::Supply,
Expand All @@ -39,17 +40,21 @@ use crate::{
db::{PgConnectionConfig, PgConnectionPoolConfig, PgPoolConnection},
errors::IndexerError,
models::{
address_metrics::StoredAddressMetrics,
checkpoints::StoredCheckpoint,
display::StoredDisplay,
epoch::StoredEpochInfo,
events::StoredEvent,
move_call_metrics::QueriedMoveCallMetrics,
network_metrics::StoredNetworkMetrics,
objects::{CoinBalance, ObjectRefColumn, StoredObject},
packages::StoredPackage,
transactions::StoredTransaction,
tx_indices::TxSequenceNumber,
},
schema::{
checkpoints, display, epochs, events, objects, objects_snapshot, packages, transactions,
address_metrics, checkpoints, display, epochs, events, move_call_metrics, objects,
objects_snapshot, packages, transactions,
},
types::{IndexerResult, OwnerType},
};
Expand Down Expand Up @@ -1539,6 +1544,119 @@ impl IndexerReader {
.collect::<IndexerResult<Vec<_>>>()
}

pub fn get_latest_network_metrics(&self) -> IndexerResult<NetworkMetrics> {
let metrics = self.run_query(|conn| {
diesel::sql_query("SELECT * FROM network_metrics;")
.get_result::<StoredNetworkMetrics>(conn)
})?;
Ok(metrics.into())
}

pub fn get_latest_move_call_metrics(&self) -> IndexerResult<MoveCallMetrics> {
let latest_3d_move_call_metrics = self.run_query(|conn| {
move_call_metrics::table
.filter(move_call_metrics::dsl::day.eq(3))
.order(move_call_metrics::dsl::id.desc())
.limit(10)
.load::<QueriedMoveCallMetrics>(conn)
})?;
let latest_7d_move_call_metrics = self.run_query(|conn| {
move_call_metrics::table
.filter(move_call_metrics::dsl::day.eq(7))
.order(move_call_metrics::dsl::id.desc())
.limit(10)
.load::<QueriedMoveCallMetrics>(conn)
})?;
let latest_30d_move_call_metrics = self.run_query(|conn| {
move_call_metrics::table
.filter(move_call_metrics::dsl::day.eq(30))
.order(move_call_metrics::dsl::id.desc())
.limit(10)
.load::<QueriedMoveCallMetrics>(conn)
})?;

let latest_3_days: Vec<(MoveFunctionName, usize)> = latest_3d_move_call_metrics
.into_iter()
.map(|m| m.try_into())
.collect::<Result<Vec<_>, _>>()?;
let latest_7_days: Vec<(MoveFunctionName, usize)> = latest_7d_move_call_metrics
.into_iter()
.map(|m| m.try_into())
.collect::<Result<Vec<_>, _>>()?;
let latest_30_days: Vec<(MoveFunctionName, usize)> = latest_30d_move_call_metrics
.into_iter()
.map(|m| m.try_into())
.collect::<Result<Vec<_>, _>>()?;
// sort by call count desc.
let rank_3_days = latest_3_days
.into_iter()
.sorted_by(|a, b| b.1.cmp(&a.1))
.collect::<Vec<_>>();
let rank_7_days = latest_7_days
.into_iter()
.sorted_by(|a, b| b.1.cmp(&a.1))
.collect::<Vec<_>>();
let rank_30_days = latest_30_days
.into_iter()
.sorted_by(|a, b| b.1.cmp(&a.1))
.collect::<Vec<_>>();
Ok(MoveCallMetrics {
rank_3_days,
rank_7_days,
rank_30_days,
})
}

pub fn get_latest_address_metrics(&self) -> IndexerResult<AddressMetrics> {
let stored_address_metrics = self.run_query(|conn| {
address_metrics::table
.order(address_metrics::dsl::checkpoint.desc())
.first::<StoredAddressMetrics>(conn)
})?;
Ok(stored_address_metrics.into())
}

pub fn get_checkpoint_address_metrics(
&self,
checkpoint_seq: u64,
) -> IndexerResult<AddressMetrics> {
let stored_address_metrics = self.run_query(|conn| {
address_metrics::table
.filter(address_metrics::dsl::checkpoint.eq(checkpoint_seq as i64))
.first::<StoredAddressMetrics>(conn)
})?;
Ok(stored_address_metrics.into())
}

pub fn get_all_epoch_address_metrics(
&self,
descending_order: Option<bool>,
) -> IndexerResult<Vec<AddressMetrics>> {
let is_descending = descending_order.unwrap_or_default();
let epoch_address_metrics_query = format!(
"WITH ranked_rows AS (
SELECT
checkpoint, epoch, timestamp_ms, cumulative_addresses, cumulative_active_addresses, daily_active_addresses,
row_number() OVER(PARTITION BY epoch ORDER BY checkpoint DESC) as row_num
FROM
address_metrics
)
SELECT
checkpoint, epoch, timestamp_ms, cumulative_addresses, cumulative_active_addresses, daily_active_addresses
FROM ranked_rows
WHERE row_num = 1 ORDER BY epoch {}",
if is_descending { "DESC" } else { "ASC" },
);
let epoch_address_metrics = self.run_query(|conn| {
diesel::sql_query(epoch_address_metrics_query).load::<StoredAddressMetrics>(conn)
})?;

Ok(epoch_address_metrics
.into_iter()
.map(|stored_address_metrics| stored_address_metrics.into())
.collect())
}

pub(crate) async fn get_display_fields(
&self,
original_object: &iota_types::object::Object,
Expand Down
4 changes: 4 additions & 0 deletions crates/iota-indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub mod indexer;
pub mod indexer_reader;
pub mod metrics;
pub mod models;
pub mod processors;
pub mod schema;
pub mod store;
pub mod test_utils;
Expand Down Expand Up @@ -74,6 +75,8 @@ pub struct IndexerConfig {
pub fullnode_sync_worker: bool,
#[clap(long)]
pub rpc_server_worker: bool,
#[clap(long)]
pub analytical_worker: bool,
}

impl IndexerConfig {
Expand Down Expand Up @@ -136,6 +139,7 @@ impl Default for IndexerConfig {
reset_db: false,
fullnode_sync_worker: true,
rpc_server_worker: true,
analytical_worker: false,
}
}
}
Expand Down
Loading

0 comments on commit d423314

Please sign in to comment.