From 2e259525d3ca335451f08dd4d0f2c2ab9d3d9844 Mon Sep 17 00:00:00 2001 From: Alex Coats Date: Wed, 21 Feb 2024 13:53:08 -0500 Subject: [PATCH] First round of updates for influx measurements. Split address activity response by type. --- Cargo.lock | 40 +++++ Cargo.toml | 15 +- src/analytics/influx.rs | 96 +++++++++-- src/analytics/ledger/active_addresses.rs | 69 ++++++-- src/analytics/ledger/address_balance.rs | 153 ++++++++++++++---- src/analytics/ledger/base_token.rs | 13 +- src/analytics/ledger/features.rs | 73 +++++++++ src/analytics/ledger/ledger_outputs.rs | 67 +++++++- src/analytics/ledger/ledger_size.rs | 9 +- src/analytics/ledger/mod.rs | 2 + src/analytics/ledger/output_activity.rs | 148 +++++++++++------ src/analytics/ledger/transaction_size.rs | 9 +- src/analytics/ledger/unlock_conditions.rs | 10 +- src/analytics/mod.rs | 68 +++++--- src/analytics/tangle/block_issuers.rs | 39 +++++ src/analytics/tangle/mana_activity.rs | 70 ++++++++ src/analytics/tangle/mod.rs | 8 +- .../inx-chronicle/api/explorer/responses.rs | 1 + src/db/influxdb/config.rs | 14 +- src/db/mongodb/collections/outputs/mod.rs | 75 +++++---- src/inx/client.rs | 18 --- 21 files changed, 810 insertions(+), 187 deletions(-) create mode 100644 src/analytics/ledger/features.rs create mode 100644 src/analytics/tangle/block_issuers.rs create mode 100644 src/analytics/tangle/mana_activity.rs diff --git a/Cargo.lock b/Cargo.lock index 603ed47f4..66ce0a0fa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -542,6 +542,7 @@ dependencies = [ "pretty_assertions", "primitive-types", "rand", + "rayon", "regex", "rust-argon2 2.1.0", "serde", @@ -675,6 +676,25 @@ dependencies = [ "libc", ] +[[package]] +name = "crossbeam-deque" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.19" @@ -2641,6 +2661,26 @@ dependencies = [ "getrandom", ] +[[package]] +name = "rayon" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa7237101a77a10773db45d62004a272517633fbcc3df19d96455ede1122e051" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + [[package]] name = "redox_syscall" version = "0.4.1" diff --git a/Cargo.toml b/Cargo.toml index 9e6861fa8..6a2e4892f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,7 +30,7 @@ dotenvy = { version = "0.15", default-features = false } eyre = { version = "0.6", default-features = false, features = [ "track-caller", "auto-install" ] } futures = { version = "0.3", default-features = false } hex = { version = "0.4", default-features = false } -humantime = { version = "2.1.0", default-features = false } +humantime = { version = "2.1", default-features = false } humantime-serde = { version = "1.1", default-features = false } iota-crypto = { version = "0.23", default-features = false, features = [ "blake2b", "ed25519", "slip10", "bip39-en", "random", "zeroize" ] } iota-sdk = { git = "https://github.com/iotaledger/iota-sdk", branch = "2.0", default-features = false, features = [ "std", "serde" ] } @@ -55,20 +55,21 @@ uuid = { version = "1.3", default-features = false, features = [ "v4" ] } # Optional chrono = { version = "0.4", default-features = false, features = [ "std" ], optional = true } influxdb = { version = "0.7", default-features = false, features = [ "use-serde", "reqwest-client-rustls", "derive" ], optional = true } +rayon = { version = "1.8", default-features = false } # API auth-helper = { version = "0.3", default-features = false, optional = true } -axum = { version = "0.7.4", default-features = false, features = [ "http1", "json", "query", "original-uri", "tokio", "macros" ], optional = true } -axum-extra = { version = "*", default-features = false, features = [ "typed-header" ] } +axum = { version = "0.7", default-features = false, features = [ "http1", "json", "query", "original-uri", "tokio", "macros" ], optional = true } +axum-extra = { version = "0.9", default-features = false, features = [ "typed-header" ] } ed25519-zebra = { version = "4.0", default-features = false, features = [ "std", "pkcs8", "pem" ], optional = true } hyper = { version = "1.1.0", default-features = false, features = [ "server" ], optional = true } -hyper-util = { version = "0.1.3", default-features = false } +hyper-util = { version = "0.1", default-features = false } rand = { version = "0.8", default-features = false, features = [ "std" ], optional = true } -regex = { version = "1.7", default-features = false, features = [ "std" ], optional = true } -rust-argon2 = { version = "2.0.0", default-features = false, optional = true } +regex = { version = "1.8.4", default-features = false, features = [ "std" ], optional = true } +rust-argon2 = { version = "2.0", default-features = false, optional = true } serde_urlencoded = { version = "0.7", default-features = false, optional = true } tower = { version = "0.4", default-features = false, optional = true } -tower-http = { version = "0.5.1", default-features = false, features = [ "cors", "catch-panic", "trace" ], optional = true } +tower-http = { version = "0.5", default-features = false, features = [ "cors", "catch-panic", "trace" ], optional = true } zeroize = { version = "1.5", default-features = false, features = [ "std", "zeroize_derive" ], optional = true } # INX diff --git a/src/analytics/influx.rs b/src/analytics/influx.rs index 69796e175..7b6f7720a 100644 --- a/src/analytics/influx.rs +++ b/src/analytics/influx.rs @@ -8,10 +8,11 @@ use iota_sdk::types::block::protocol::ProtocolParameters; use super::{ ledger::{ - AddressActivityMeasurement, AddressBalanceMeasurement, BaseTokenActivityMeasurement, LedgerOutputMeasurement, - LedgerSizeMeasurement, OutputActivityMeasurement, TransactionSizeMeasurement, UnlockConditionMeasurement, + AddressActivityMeasurement, AddressBalanceMeasurement, BaseTokenActivityMeasurement, FeaturesMeasurement, + LedgerOutputMeasurement, LedgerSizeMeasurement, OutputActivityMeasurement, TransactionSizeMeasurement, + UnlockConditionMeasurement, }, - tangle::{BlockActivityMeasurement, SlotSizeMeasurement}, + tangle::{BlockActivityMeasurement, BlockIssuerMeasurement, ManaActivityMeasurement, SlotSizeMeasurement}, AnalyticsInterval, PerInterval, PerSlot, }; use crate::db::influxdb::InfluxDb; @@ -112,11 +113,42 @@ impl Measurement for AddressBalanceMeasurement { const NAME: &'static str = "iota_addresses"; fn add_fields(&self, query: WriteQuery) -> WriteQuery { - let mut query = query.add_field("address_with_balance_count", self.address_with_balance_count as u64); + let mut query = query + .add_field( + "ed25519_address_with_balance_count", + self.ed25519_address_with_balance_count as u64, + ) + .add_field( + "account_address_with_balance_count", + self.account_address_with_balance_count as u64, + ) + .add_field( + "nft_address_with_balance_count", + self.nft_address_with_balance_count as u64, + ) + .add_field( + "anchor_address_with_balance_count", + self.anchor_address_with_balance_count as u64, + ) + .add_field( + "implicit_account_address_with_balance_count", + self.implicit_address_with_balance_count as u64, + ); for (index, stat) in self.token_distribution.iter().enumerate() { query = query - .add_field(format!("address_count_{index}"), stat.address_count) - .add_field(format!("total_amount_{index}"), stat.total_amount); + .add_field(format!("ed25519_address_count_{index}"), stat.ed25519_count as u64) + .add_field(format!("ed25519_total_amount_{index}"), stat.ed25519_amount) + .add_field(format!("account_address_count_{index}"), stat.account_count as u64) + .add_field(format!("account_total_amount_{index}"), stat.account_amount) + .add_field(format!("nft_address_count_{index}"), stat.nft_count as u64) + .add_field(format!("nft_total_amount_{index}"), stat.nft_amount) + .add_field(format!("anchor_address_count_{index}"), stat.anchor_count as u64) + .add_field(format!("anchor_total_amount_{index}"), stat.anchor_amount) + .add_field( + format!("implicit_account_address_count_{index}"), + stat.implicit_count as u64, + ) + .add_field(format!("implicit_account_total_amount_{index}"), stat.implicit_amount); } query } @@ -158,11 +190,35 @@ impl Measurement for BlockActivityMeasurement { } } +impl Measurement for BlockIssuerMeasurement { + const NAME: &'static str = "iota_block_issuer_activity"; + + fn add_fields(&self, query: WriteQuery) -> WriteQuery { + query.add_field("active_issuer_count", self.active_issuer_count as u64) + } +} + +impl Measurement for ManaActivityMeasurement { + const NAME: &'static str = "iota_mana_activity"; + + fn add_fields(&self, query: WriteQuery) -> WriteQuery { + query + .add_field("rewards_claimed", self.rewards_claimed) + .add_field("mana_burned", self.mana_burned) + .add_field("bic_burned", self.bic_burned) + } +} + impl Measurement for AddressActivityMeasurement { const NAME: &'static str = "iota_active_addresses"; fn add_fields(&self, query: WriteQuery) -> WriteQuery { - query.add_field("count", self.count as u64) + query + .add_field("ed25519_count", self.ed25519_count as u64) + .add_field("account_count", self.account_count as u64) + .add_field("nft_count", self.nft_count as u64) + .add_field("anchor_count", self.anchor_count as u64) + .add_field("implicit_account_count", self.implicit_count as u64) } } @@ -203,8 +259,9 @@ impl Measurement for LedgerOutputMeasurement { query .add_field("basic_count", self.basic.count as u64) .add_field("basic_amount", self.basic.amount) - .add_field("account_count", self.account.count as u64) - .add_field("account_amount", self.account.amount) + .add_field("account_count", self.account.count_and_amount.count as u64) + .add_field("account_amount", self.account.count_and_amount.amount) + .add_field("block_issuer_accounts", self.account.block_issuers_count as u64) .add_field("anchor_count", self.anchor.count as u64) .add_field("anchor_amount", self.anchor.amount) .add_field("foundry_count", self.foundry.count as u64) @@ -251,6 +308,10 @@ impl Measurement for OutputActivityMeasurement { fn add_fields(&self, query: WriteQuery) -> WriteQuery { query .add_field("account_created_count", self.account.created_count as u64) + .add_field( + "account_block_issuer_key_rotated_count", + self.account.block_issuer_key_rotated as u64, + ) .add_field("account_destroyed_count", self.account.destroyed_count as u64) .add_field("anchor_created_count", self.anchor.created_count as u64) .add_field("anchor_state_changed_count", self.anchor.state_changed_count as u64) @@ -266,7 +327,10 @@ impl Measurement for OutputActivityMeasurement { .add_field("foundry_transferred_count", self.foundry.transferred_count as u64) .add_field("foundry_destroyed_count", self.foundry.destroyed_count as u64) .add_field("delegation_created_count", self.delegation.created_count as u64) + .add_field("delegation_delayed_count", self.delegation.delayed_count as u64) .add_field("delegation_destroyed_count", self.delegation.destroyed_count as u64) + .add_field("native_token_minted_count", self.native_token.minted_count as u64) + .add_field("native_token_melted_count", self.native_token.melted_count as u64) } } @@ -297,6 +361,20 @@ impl Measurement for UnlockConditionMeasurement { } } +impl Measurement for FeaturesMeasurement { + const NAME: &'static str = "iota_features"; + + fn add_fields(&self, query: WriteQuery) -> WriteQuery { + query + .add_field("native_tokens_count", self.native_tokens.count as u64) + .add_field("native_tokens_amount", self.native_tokens.amount) + .add_field("block_issuer_key_count", self.block_issuer.count as u64) + .add_field("block_issuer_key_amount", self.block_issuer.amount) + .add_field("staking_count", self.staking.count as u64) + .add_field("staking_amount", self.staking.amount) + } +} + impl InfluxDb { /// Writes a [`Measurement`] to the InfluxDB database. pub(super) async fn insert_measurement(&self, measurement: impl PrepareQuery) -> Result<(), influxdb::Error> { diff --git a/src/analytics/ledger/active_addresses.rs b/src/analytics/ledger/active_addresses.rs index 9cfa6783a..22845d8b2 100644 --- a/src/analytics/ledger/active_addresses.rs +++ b/src/analytics/ledger/active_addresses.rs @@ -3,7 +3,10 @@ use std::collections::HashSet; -use iota_sdk::types::block::address::{Bech32Address, ToBech32Ext}; +use iota_sdk::types::block::{ + address::{AccountAddress, Address, AnchorAddress, Ed25519Address, ImplicitAccountCreationAddress, NftAddress}, + payload::SignedTransactionPayload, +}; use crate::{ analytics::{Analytics, AnalyticsContext, AnalyticsInterval, IntervalAnalytics}, @@ -13,14 +16,22 @@ use crate::{ #[derive(Debug, Default)] pub(crate) struct AddressActivityMeasurement { - pub(crate) count: usize, + pub(crate) ed25519_count: usize, + pub(crate) account_count: usize, + pub(crate) nft_count: usize, + pub(crate) anchor_count: usize, + pub(crate) implicit_count: usize, } /// Computes the number of addresses that were active during a given time interval. #[allow(missing_docs)] #[derive(Debug, Default)] pub(crate) struct AddressActivityAnalytics { - addresses: HashSet, + ed25519_addresses: HashSet, + account_addresses: HashSet, + nft_addresses: HashSet, + anchor_addresses: HashSet, + implicit_addresses: HashSet, } #[async_trait::async_trait] @@ -33,35 +44,73 @@ impl IntervalAnalytics for AddressActivityMeasurement { interval: AnalyticsInterval, db: &MongoDb, ) -> eyre::Result { - let count = db + let res = db .collection::() .get_address_activity_count_in_range(start_date, interval.end_date(&start_date)) .await?; - Ok(AddressActivityMeasurement { count }) + Ok(AddressActivityMeasurement { + ed25519_count: res.ed25519_count, + account_count: res.account_count, + nft_count: res.nft_count, + anchor_count: res.anchor_count, + implicit_count: res.implicit_count, + }) } } impl Analytics for AddressActivityAnalytics { type Measurement = AddressActivityMeasurement; - fn handle_transaction(&mut self, consumed: &[LedgerSpent], created: &[LedgerOutput], ctx: &dyn AnalyticsContext) { - let hrp = ctx.protocol_parameters().bech32_hrp(); + fn handle_transaction( + &mut self, + _payload: &SignedTransactionPayload, + consumed: &[LedgerSpent], + created: &[LedgerOutput], + _ctx: &dyn AnalyticsContext, + ) { for output in consumed { if let Some(a) = output.address() { - self.addresses.insert(a.clone().to_bech32(hrp)); + self.add_address(a); } } for output in created { if let Some(a) = output.address() { - self.addresses.insert(a.clone().to_bech32(hrp)); + self.add_address(a); } } } fn take_measurement(&mut self, _ctx: &dyn AnalyticsContext) -> Self::Measurement { AddressActivityMeasurement { - count: std::mem::take(self).addresses.len(), + ed25519_count: std::mem::take(&mut self.ed25519_addresses).len(), + account_count: std::mem::take(&mut self.account_addresses).len(), + nft_count: std::mem::take(&mut self.nft_addresses).len(), + anchor_count: std::mem::take(&mut self.anchor_addresses).len(), + implicit_count: std::mem::take(&mut self.implicit_addresses).len(), + } + } +} + +impl AddressActivityAnalytics { + fn add_address(&mut self, address: &Address) { + match address { + Address::Ed25519(a) => { + self.ed25519_addresses.insert(*a); + } + Address::Account(a) => { + self.account_addresses.insert(*a); + } + Address::Nft(a) => { + self.nft_addresses.insert(*a); + } + Address::Anchor(a) => { + self.anchor_addresses.insert(*a); + } + Address::ImplicitAccountCreation(a) => { + self.implicit_addresses.insert(*a); + } + _ => (), } } } diff --git a/src/analytics/ledger/address_balance.rs b/src/analytics/ledger/address_balance.rs index 5c307ee01..2c5914ae2 100644 --- a/src/analytics/ledger/address_balance.rs +++ b/src/analytics/ledger/address_balance.rs @@ -3,7 +3,10 @@ use std::collections::HashMap; -use iota_sdk::types::block::address::Address; +use iota_sdk::types::block::{ + address::{AccountAddress, Address, AnchorAddress, Ed25519Address, ImplicitAccountCreationAddress, NftAddress}, + payload::SignedTransactionPayload, +}; use serde::{Deserialize, Serialize}; use crate::{ @@ -13,58 +16,128 @@ use crate::{ #[derive(Debug)] pub(crate) struct AddressBalanceMeasurement { - pub(crate) address_with_balance_count: usize, + pub(crate) ed25519_address_with_balance_count: usize, + pub(crate) account_address_with_balance_count: usize, + pub(crate) nft_address_with_balance_count: usize, + pub(crate) anchor_address_with_balance_count: usize, + pub(crate) implicit_address_with_balance_count: usize, pub(crate) token_distribution: Vec, } /// Statistics for a particular logarithmic range of balances. #[derive(Copy, Clone, Debug, Default)] pub(crate) struct DistributionStat { - /// The number of unique addresses in this range. - pub(crate) address_count: u64, - /// The total amount of tokens in this range. - pub(crate) total_amount: u64, + pub(crate) ed25519_count: usize, + pub(crate) ed25519_amount: u64, + pub(crate) account_count: usize, + pub(crate) account_amount: u64, + pub(crate) nft_count: usize, + pub(crate) nft_amount: u64, + pub(crate) anchor_count: usize, + pub(crate) anchor_amount: u64, + pub(crate) implicit_count: usize, + pub(crate) implicit_amount: u64, } /// Computes the number of addresses the currently hold a balance. -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Default)] pub(crate) struct AddressBalancesAnalytics { - balances: HashMap, + ed25519_balances: HashMap, + account_balances: HashMap, + nft_balances: HashMap, + anchor_balances: HashMap, + implicit_balances: HashMap, } impl AddressBalancesAnalytics { /// Initialize the analytics by reading the current ledger state. pub(crate) fn init<'a>(unspent_outputs: impl IntoIterator) -> Self { - let mut balances = HashMap::new(); + let mut balances = AddressBalancesAnalytics::default(); for output in unspent_outputs { if let Some(a) = output.address() { - *balances.entry(a.clone()).or_default() += output.amount(); + balances.add_address(a, output.amount()); + } + } + balances + } + + fn add_address(&mut self, address: &Address, output_amount: u64) { + match address { + Address::Ed25519(a) => *self.ed25519_balances.entry(*a).or_default() += output_amount, + Address::Account(a) => *self.account_balances.entry(*a).or_default() += output_amount, + Address::Nft(a) => *self.nft_balances.entry(*a).or_default() += output_amount, + Address::Anchor(a) => *self.anchor_balances.entry(*a).or_default() += output_amount, + Address::ImplicitAccountCreation(a) => *self.implicit_balances.entry(*a).or_default() += output_amount, + _ => (), + } + } + + fn remove_amount(&mut self, address: &Address, output_amount: u64) { + match address { + Address::Ed25519(a) => { + if let Some(amount) = self.ed25519_balances.get_mut(a) { + *amount -= output_amount; + if *amount == 0 { + self.ed25519_balances.remove(a); + } + } + } + Address::Account(a) => { + if let Some(amount) = self.account_balances.get_mut(a) { + *amount -= output_amount; + if *amount == 0 { + self.account_balances.remove(a); + } + } + } + Address::Nft(a) => { + if let Some(amount) = self.nft_balances.get_mut(a) { + *amount -= output_amount; + if *amount == 0 { + self.nft_balances.remove(a); + } + } + } + Address::Anchor(a) => { + if let Some(amount) = self.anchor_balances.get_mut(a) { + *amount -= output_amount; + if *amount == 0 { + self.anchor_balances.remove(a); + } + } + } + Address::ImplicitAccountCreation(a) => { + if let Some(amount) = self.implicit_balances.get_mut(a) { + *amount -= output_amount; + if *amount == 0 { + self.implicit_balances.remove(a); + } + } } + _ => (), } - Self { balances } } } impl Analytics for AddressBalancesAnalytics { type Measurement = AddressBalanceMeasurement; - fn handle_transaction(&mut self, consumed: &[LedgerSpent], created: &[LedgerOutput], _ctx: &dyn AnalyticsContext) { + fn handle_transaction( + &mut self, + _payload: &SignedTransactionPayload, + consumed: &[LedgerSpent], + created: &[LedgerOutput], + _ctx: &dyn AnalyticsContext, + ) { for output in consumed { - if let Some(a) = output.address() { - // All inputs should be present in `addresses`. If not, we skip it's value. - if let Some(amount) = self.balances.get_mut(a) { - *amount -= output.amount(); - if *amount == 0 { - self.balances.remove(a); - } - } + if let Some(address) = output.address() { + self.remove_amount(address, output.amount()); } } for output in created { - if let Some(a) = output.address() { - // All inputs should be present in `addresses`. If not, we skip it's value. - *self.balances.entry(a.clone()).or_default() += output.amount(); + if let Some(address) = output.address() { + self.add_address(address, output.amount()) } } } @@ -73,14 +146,38 @@ impl Analytics for AddressBalancesAnalytics { let bucket_max = ctx.protocol_parameters().token_supply().ilog10() as usize + 1; let mut token_distribution = vec![DistributionStat::default(); bucket_max]; - for amount in self.balances.values() { - // Balances are partitioned into ranges defined by: [10^index..10^(index+1)). + // Balances are partitioned into ranges defined by: [10^index..10^(index+1)). + for amount in self.ed25519_balances.values() { + let index = amount.ilog10() as usize; + token_distribution[index].ed25519_count += 1; + token_distribution[index].ed25519_amount += *amount; + } + for amount in self.account_balances.values() { + let index = amount.ilog10() as usize; + token_distribution[index].account_count += 1; + token_distribution[index].account_amount += *amount; + } + for amount in self.nft_balances.values() { + let index = amount.ilog10() as usize; + token_distribution[index].nft_count += 1; + token_distribution[index].nft_amount += *amount; + } + for amount in self.anchor_balances.values() { + let index = amount.ilog10() as usize; + token_distribution[index].anchor_count += 1; + token_distribution[index].anchor_amount += *amount; + } + for amount in self.implicit_balances.values() { let index = amount.ilog10() as usize; - token_distribution[index].address_count += 1; - token_distribution[index].total_amount += *amount; + token_distribution[index].implicit_count += 1; + token_distribution[index].implicit_amount += *amount; } AddressBalanceMeasurement { - address_with_balance_count: self.balances.len(), + ed25519_address_with_balance_count: self.ed25519_balances.len(), + account_address_with_balance_count: self.account_balances.len(), + nft_address_with_balance_count: self.nft_balances.len(), + anchor_address_with_balance_count: self.anchor_balances.len(), + implicit_address_with_balance_count: self.implicit_balances.len(), token_distribution, } } diff --git a/src/analytics/ledger/base_token.rs b/src/analytics/ledger/base_token.rs index 77f8f8bc4..511ccaaf5 100644 --- a/src/analytics/ledger/base_token.rs +++ b/src/analytics/ledger/base_token.rs @@ -3,7 +3,10 @@ use std::collections::HashMap; -use iota_sdk::types::block::address::{Bech32Address, ToBech32Ext}; +use iota_sdk::types::block::{ + address::{Bech32Address, ToBech32Ext}, + payload::SignedTransactionPayload, +}; use crate::{ analytics::{Analytics, AnalyticsContext}, @@ -23,7 +26,13 @@ pub(crate) struct BaseTokenActivityMeasurement { impl Analytics for BaseTokenActivityMeasurement { type Measurement = Self; - fn handle_transaction(&mut self, consumed: &[LedgerSpent], created: &[LedgerOutput], ctx: &dyn AnalyticsContext) { + fn handle_transaction( + &mut self, + _payload: &SignedTransactionPayload, + consumed: &[LedgerSpent], + created: &[LedgerOutput], + ctx: &dyn AnalyticsContext, + ) { let hrp = ctx.protocol_parameters().bech32_hrp(); // The idea behind the following code is that we keep track of the deltas that are applied to each account that // is represented by an address. diff --git a/src/analytics/ledger/features.rs b/src/analytics/ledger/features.rs new file mode 100644 index 000000000..3ef75819c --- /dev/null +++ b/src/analytics/ledger/features.rs @@ -0,0 +1,73 @@ +// Copyright 2023 IOTA Stiftung +// SPDX-License-Identifier: Apache-2.0 + +use iota_sdk::types::block::{output::Feature, payload::SignedTransactionPayload}; +use serde::{Deserialize, Serialize}; + +use super::CountAndAmount; +use crate::{ + analytics::{Analytics, AnalyticsContext}, + model::ledger::{LedgerOutput, LedgerSpent}, +}; + +#[derive(Copy, Clone, Debug, Default, Serialize, Deserialize)] +#[allow(missing_docs)] +pub(crate) struct FeaturesMeasurement { + pub(crate) native_tokens: CountAndAmount, + pub(crate) block_issuer: CountAndAmount, + pub(crate) staking: CountAndAmount, +} + +impl FeaturesMeasurement { + fn wrapping_add(&mut self, rhs: Self) { + self.native_tokens.wrapping_add(rhs.native_tokens); + self.block_issuer.wrapping_add(rhs.block_issuer); + self.staking.wrapping_add(rhs.staking); + } + + fn wrapping_sub(&mut self, rhs: Self) { + self.native_tokens.wrapping_sub(rhs.native_tokens); + self.block_issuer.wrapping_sub(rhs.block_issuer); + self.staking.wrapping_sub(rhs.staking); + } + + /// Initialize the analytics by reading the current ledger state. + pub(crate) fn init<'a>(unspent_outputs: impl IntoIterator) -> Self { + let mut measurement = Self::default(); + for output in unspent_outputs { + if let Some(features) = output.output().features() { + for feature in features.iter() { + match feature { + Feature::NativeToken(_) => measurement.native_tokens.add_output(output), + Feature::BlockIssuer(_) => measurement.block_issuer.add_output(output), + Feature::Staking(_) => measurement.staking.add_output(output), + _ => (), + } + } + } + } + measurement + } +} + +impl Analytics for FeaturesMeasurement { + type Measurement = Self; + + fn handle_transaction( + &mut self, + _payload: &SignedTransactionPayload, + consumed: &[LedgerSpent], + created: &[LedgerOutput], + _ctx: &dyn AnalyticsContext, + ) { + let consumed = Self::init(consumed.iter().map(|input| &input.output)); + let created = Self::init(created); + + self.wrapping_add(created); + self.wrapping_sub(consumed); + } + + fn take_measurement(&mut self, _ctx: &dyn AnalyticsContext) -> Self::Measurement { + *self + } +} diff --git a/src/analytics/ledger/ledger_outputs.rs b/src/analytics/ledger/ledger_outputs.rs index 22b63040f..077569650 100644 --- a/src/analytics/ledger/ledger_outputs.rs +++ b/src/analytics/ledger/ledger_outputs.rs @@ -3,7 +3,12 @@ #![allow(missing_docs)] -use iota_sdk::types::block::output::Output; +use std::collections::HashSet; + +use iota_sdk::types::block::{ + output::{AccountId, Output}, + payload::SignedTransactionPayload, +}; use serde::{Deserialize, Serialize}; use super::CountAndAmount; @@ -14,7 +19,7 @@ use crate::{ #[derive(Copy, Clone, Debug, Default, Serialize, Deserialize)] pub(crate) struct LedgerOutputMeasurement { - pub(crate) account: CountAndAmount, + pub(crate) account: AccountOutputMeasurement, pub(crate) basic: CountAndAmount, pub(crate) nft: CountAndAmount, pub(crate) foundry: CountAndAmount, @@ -28,7 +33,12 @@ impl LedgerOutputMeasurement { let mut measurement = Self::default(); for output in unspent_outputs { match output.output() { - Output::Account(_) => measurement.account.add_output(output), + Output::Account(account_output) => { + measurement.account.count_and_amount.add_output(output); + if account_output.is_block_issuer() { + measurement.account.block_issuers_count += 1; + } + } Output::Basic(_) => measurement.basic.add_output(output), Output::Nft(_) => measurement.nft.add_output(output), Output::Foundry(_) => measurement.foundry.add_output(output), @@ -61,7 +71,38 @@ impl LedgerOutputMeasurement { impl Analytics for LedgerOutputMeasurement { type Measurement = Self; - fn handle_transaction(&mut self, consumed: &[LedgerSpent], created: &[LedgerOutput], _ctx: &dyn AnalyticsContext) { + fn handle_transaction( + &mut self, + _payload: &SignedTransactionPayload, + consumed: &[LedgerSpent], + created: &[LedgerOutput], + _ctx: &dyn AnalyticsContext, + ) { + fn map(ledger_output: &LedgerOutput) -> Option { + ledger_output.output().as_account_opt().and_then(|output| { + output + .is_block_issuer() + .then_some(output.account_id_non_null(&ledger_output.output_id)) + }) + } + + let issuer_inputs = consumed + .iter() + .map(|o| &o.output) + .filter_map(map) + .collect::>(); + + let issuer_outputs = created.iter().filter_map(map).collect::>(); + + self.account.block_issuers_count = self + .account + .block_issuers_count + .wrapping_add(issuer_outputs.difference(&issuer_inputs).count()); + self.account.block_issuers_count = self + .account + .block_issuers_count + .wrapping_sub(issuer_inputs.difference(&issuer_outputs).count()); + let consumed = Self::init(consumed.iter().map(|input| &input.output)); let created = Self::init(created); @@ -73,3 +114,21 @@ impl Analytics for LedgerOutputMeasurement { *self } } + +#[derive(Copy, Clone, Debug, Default, Serialize, Deserialize)] +pub(crate) struct AccountOutputMeasurement { + pub(crate) count_and_amount: CountAndAmount, + pub(crate) block_issuers_count: usize, +} + +impl AccountOutputMeasurement { + fn wrapping_add(&mut self, rhs: Self) { + self.count_and_amount.wrapping_add(rhs.count_and_amount); + self.block_issuers_count = self.block_issuers_count.wrapping_add(rhs.block_issuers_count); + } + + fn wrapping_sub(&mut self, rhs: Self) { + self.count_and_amount.wrapping_sub(rhs.count_and_amount); + self.block_issuers_count = self.block_issuers_count.wrapping_sub(rhs.block_issuers_count); + } +} diff --git a/src/analytics/ledger/ledger_size.rs b/src/analytics/ledger/ledger_size.rs index f1a3251c7..510b680f9 100644 --- a/src/analytics/ledger/ledger_size.rs +++ b/src/analytics/ledger/ledger_size.rs @@ -3,6 +3,7 @@ use iota_sdk::types::block::{ output::{Output, StorageScore}, + payload::SignedTransactionPayload, protocol::ProtocolParameters, }; use serde::{Deserialize, Serialize}; @@ -67,7 +68,13 @@ impl LedgerSizeAnalytics { impl Analytics for LedgerSizeAnalytics { type Measurement = LedgerSizeMeasurement; - fn handle_transaction(&mut self, consumed: &[LedgerSpent], created: &[LedgerOutput], ctx: &dyn AnalyticsContext) { + fn handle_transaction( + &mut self, + _payload: &SignedTransactionPayload, + consumed: &[LedgerSpent], + created: &[LedgerOutput], + ctx: &dyn AnalyticsContext, + ) { for output in created { self.measurement .wrapping_add(output.output().ledger_size(ctx.protocol_parameters())); diff --git a/src/analytics/ledger/mod.rs b/src/analytics/ledger/mod.rs index b750ad42b..8c7392c26 100644 --- a/src/analytics/ledger/mod.rs +++ b/src/analytics/ledger/mod.rs @@ -10,6 +10,7 @@ pub(super) use self::{ active_addresses::{AddressActivityAnalytics, AddressActivityMeasurement}, address_balance::{AddressBalanceMeasurement, AddressBalancesAnalytics}, base_token::BaseTokenActivityMeasurement, + features::FeaturesMeasurement, ledger_outputs::LedgerOutputMeasurement, ledger_size::{LedgerSizeAnalytics, LedgerSizeMeasurement}, output_activity::OutputActivityMeasurement, @@ -21,6 +22,7 @@ use crate::model::ledger::LedgerOutput; mod active_addresses; mod address_balance; mod base_token; +mod features; mod ledger_outputs; mod ledger_size; mod output_activity; diff --git a/src/analytics/ledger/output_activity.rs b/src/analytics/ledger/output_activity.rs index 40623b7b9..595e69b39 100644 --- a/src/analytics/ledger/output_activity.rs +++ b/src/analytics/ledger/output_activity.rs @@ -1,11 +1,12 @@ // Copyright 2023 IOTA Stiftung // SPDX-License-Identifier: Apache-2.0 -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use iota_sdk::types::block::{ address::Address, - output::{AccountId, AnchorId, DelegationId}, + output::{AccountId, AccountOutput, AnchorId}, + payload::SignedTransactionPayload, }; use serde::{Deserialize, Serialize}; @@ -22,17 +23,25 @@ pub(crate) struct OutputActivityMeasurement { pub(crate) anchor: AnchorActivityMeasurement, pub(crate) foundry: FoundryActivityMeasurement, pub(crate) delegation: DelegationActivityMeasurement, + pub(crate) native_token: NativeTokenActivityMeasurement, } impl Analytics for OutputActivityMeasurement { type Measurement = Self; - fn handle_transaction(&mut self, consumed: &[LedgerSpent], created: &[LedgerOutput], _ctx: &dyn AnalyticsContext) { + fn handle_transaction( + &mut self, + _payload: &SignedTransactionPayload, + consumed: &[LedgerSpent], + created: &[LedgerOutput], + _ctx: &dyn AnalyticsContext, + ) { self.nft.handle_transaction(consumed, created); self.account.handle_transaction(consumed, created); self.anchor.handle_transaction(consumed, created); self.foundry.handle_transaction(consumed, created); self.delegation.handle_transaction(consumed, created); + self.native_token.handle_transaction(consumed, created); } fn take_measurement(&mut self, _ctx: &dyn AnalyticsContext) -> Self::Measurement { @@ -75,45 +84,44 @@ impl NftActivityMeasurement { #[derive(Copy, Clone, Debug, Default, PartialEq, Serialize, Deserialize)] pub(crate) struct AccountActivityMeasurement { pub(crate) created_count: usize, + pub(crate) transferred_count: usize, + pub(crate) block_issuer_key_rotated: usize, pub(crate) destroyed_count: usize, } -struct AccountData { - account_id: AccountId, -} - -impl std::cmp::PartialEq for AccountData { - fn eq(&self, other: &Self) -> bool { - self.account_id == other.account_id - } -} - -impl std::cmp::Eq for AccountData {} - -impl std::hash::Hash for AccountData { - fn hash(&self, state: &mut H) { - self.account_id.hash(state); - } -} - impl AccountActivityMeasurement { fn handle_transaction(&mut self, consumed: &[LedgerSpent], created: &[LedgerOutput]) { - let map = |ledger_output: &LedgerOutput| { - ledger_output.output().as_account_opt().map(|output| AccountData { - account_id: output.account_id_non_null(&ledger_output.output_id), - }) - }; + fn map(ledger_output: &LedgerOutput) -> Option<(AccountId, &AccountOutput)> { + ledger_output + .output() + .as_account_opt() + .map(|output| (output.account_id_non_null(&ledger_output.output_id), output)) + } let account_inputs = consumed .iter() .map(|o| &o.output) .filter_map(map) - .collect::>(); - - let account_outputs = created.iter().filter_map(map).collect::>(); - - self.created_count += account_outputs.difference(&account_inputs).count(); - self.destroyed_count += account_inputs.difference(&account_outputs).count(); + .collect::>(); + + let account_outputs = created.iter().filter_map(map).collect::>(); + + self.created_count += account_outputs.difference_count(&account_inputs); + self.transferred_count += account_outputs.intersection_count(&account_inputs); + self.destroyed_count += account_inputs.difference_count(&account_outputs); + for (account_id, output_feature) in account_outputs + .into_iter() + .filter_map(|(id, o)| o.features().block_issuer().map(|f| (id, f))) + { + if let Some(input_feature) = account_inputs + .get(&account_id) + .and_then(|o| o.features().block_issuer()) + { + if input_feature.block_issuer_keys() != output_feature.block_issuer_keys() { + self.block_issuer_key_rotated += 1; + } + } + } } } @@ -213,33 +221,17 @@ impl FoundryActivityMeasurement { #[derive(Copy, Clone, Debug, Default, PartialEq, Serialize, Deserialize)] pub(crate) struct DelegationActivityMeasurement { pub(crate) created_count: usize, + pub(crate) delayed_count: usize, pub(crate) destroyed_count: usize, } -struct DelegationData { - delegation_id: DelegationId, -} - -impl std::cmp::PartialEq for DelegationData { - fn eq(&self, other: &Self) -> bool { - self.delegation_id == other.delegation_id - } -} - -impl std::cmp::Eq for DelegationData {} - -impl std::hash::Hash for DelegationData { - fn hash(&self, state: &mut H) { - self.delegation_id.hash(state); - } -} - impl DelegationActivityMeasurement { fn handle_transaction(&mut self, consumed: &[LedgerSpent], created: &[LedgerOutput]) { let map = |ledger_output: &LedgerOutput| { - ledger_output.output().as_delegation_opt().map(|output| DelegationData { - delegation_id: output.delegation_id_non_null(&ledger_output.output_id), - }) + ledger_output + .output() + .as_delegation_opt() + .map(|output| output.delegation_id_non_null(&ledger_output.output_id)) }; let delegation_inputs = consumed .iter() @@ -250,6 +242,56 @@ impl DelegationActivityMeasurement { let delegation_outputs = created.iter().filter_map(map).collect::>(); self.created_count += delegation_outputs.difference(&delegation_inputs).count(); + // self.delayed_count += todo!(); self.destroyed_count += delegation_inputs.difference(&delegation_outputs).count(); } } + +/// Delegation activity statistics. +#[derive(Copy, Clone, Debug, Default, PartialEq, Serialize, Deserialize)] +pub(crate) struct NativeTokenActivityMeasurement { + pub(crate) minted_count: usize, + pub(crate) melted_count: usize, +} + +impl NativeTokenActivityMeasurement { + fn handle_transaction(&mut self, consumed: &[LedgerSpent], created: &[LedgerOutput]) { + let map = |ledger_output: &LedgerOutput| ledger_output.output().native_token().map(|nt| *nt.token_id()); + let native_token_inputs = consumed + .iter() + .map(|o| &o.output) + .filter_map(map) + .collect::>(); + + let native_token_outputs = created.iter().filter_map(map).collect::>(); + + self.minted_count += native_token_outputs.difference(&native_token_inputs).count(); + self.melted_count += native_token_inputs.difference(&native_token_outputs).count(); + } +} + +trait SetOps { + fn difference_count(&self, other: &Self) -> usize; + + fn intersection_count(&self, other: &Self) -> usize; +} + +impl SetOps for HashSet { + fn difference_count(&self, other: &Self) -> usize { + self.difference(other).count() + } + + fn intersection_count(&self, other: &Self) -> usize { + self.intersection(other).count() + } +} + +impl SetOps for HashMap { + fn difference_count(&self, other: &Self) -> usize { + self.keys().filter(|k| !other.contains_key(k)).count() + } + + fn intersection_count(&self, other: &Self) -> usize { + self.keys().filter(|k| other.contains_key(k)).count() + } +} diff --git a/src/analytics/ledger/transaction_size.rs b/src/analytics/ledger/transaction_size.rs index 903a30c4e..cd3a4f23f 100644 --- a/src/analytics/ledger/transaction_size.rs +++ b/src/analytics/ledger/transaction_size.rs @@ -1,6 +1,7 @@ // Copyright 2023 IOTA Stiftung // SPDX-License-Identifier: Apache-2.0 +use iota_sdk::types::block::payload::SignedTransactionPayload; use serde::{Deserialize, Serialize}; use crate::{ @@ -58,7 +59,13 @@ pub(crate) struct TransactionSizeMeasurement { impl Analytics for TransactionSizeMeasurement { type Measurement = TransactionSizeMeasurement; - fn handle_transaction(&mut self, consumed: &[LedgerSpent], created: &[LedgerOutput], _ctx: &dyn AnalyticsContext) { + fn handle_transaction( + &mut self, + _payload: &SignedTransactionPayload, + consumed: &[LedgerSpent], + created: &[LedgerOutput], + _ctx: &dyn AnalyticsContext, + ) { self.input_buckets.add(consumed.len()); self.output_buckets.add(created.len()); } diff --git a/src/analytics/ledger/unlock_conditions.rs b/src/analytics/ledger/unlock_conditions.rs index 1648c35b9..8a9514e36 100644 --- a/src/analytics/ledger/unlock_conditions.rs +++ b/src/analytics/ledger/unlock_conditions.rs @@ -1,7 +1,7 @@ // Copyright 2023 IOTA Stiftung // SPDX-License-Identifier: Apache-2.0 -use iota_sdk::types::block::output::Output; +use iota_sdk::types::block::{output::Output, payload::SignedTransactionPayload}; use serde::{Deserialize, Serialize}; use super::CountAndAmount; @@ -77,7 +77,13 @@ impl UnlockConditionMeasurement { impl Analytics for UnlockConditionMeasurement { type Measurement = Self; - fn handle_transaction(&mut self, consumed: &[LedgerSpent], created: &[LedgerOutput], _ctx: &dyn AnalyticsContext) { + fn handle_transaction( + &mut self, + _payload: &SignedTransactionPayload, + consumed: &[LedgerSpent], + created: &[LedgerOutput], + _ctx: &dyn AnalyticsContext, + ) { let consumed = Self::init(consumed.iter().map(|input| &input.output)); let created = Self::init(created); diff --git a/src/analytics/mod.rs b/src/analytics/mod.rs index 7559697b6..e34aa1e11 100644 --- a/src/analytics/mod.rs +++ b/src/analytics/mod.rs @@ -4,17 +4,23 @@ //! Various analytics that give insight into the usage of the tangle. use futures::TryStreamExt; -use iota_sdk::types::block::{output::OutputId, protocol::ProtocolParameters, slot::SlotIndex, Block}; +use iota_sdk::types::block::{ + output::OutputId, payload::SignedTransactionPayload, protocol::ProtocolParameters, slot::SlotIndex, Block, +}; +use rayon::iter::{IntoParallelRefMutIterator, ParallelIterator}; use thiserror::Error; use self::{ influx::PrepareQuery, ledger::{ AddressActivityAnalytics, AddressActivityMeasurement, AddressBalancesAnalytics, BaseTokenActivityMeasurement, - LedgerOutputMeasurement, LedgerSizeAnalytics, OutputActivityMeasurement, TransactionSizeMeasurement, - UnlockConditionMeasurement, + FeaturesMeasurement, LedgerOutputMeasurement, LedgerSizeAnalytics, OutputActivityMeasurement, + TransactionSizeMeasurement, UnlockConditionMeasurement, + }, + tangle::{ + BlockActivityMeasurement, BlockIssuerAnalytics, ManaActivityMeasurement, ProtocolParamsAnalytics, + SlotSizeMeasurement, }, - tangle::{BlockActivityMeasurement, ProtocolParamsAnalytics, SlotSizeMeasurement}, }; use crate::{ db::{ @@ -47,6 +53,7 @@ pub trait Analytics { /// Handle a transaction consisting of inputs (consumed [`LedgerSpent`]) and outputs (created [`LedgerOutput`]). fn handle_transaction( &mut self, + _payload: &SignedTransactionPayload, _consumed: &[LedgerSpent], _created: &[LedgerOutput], _ctx: &dyn AnalyticsContext, @@ -60,7 +67,13 @@ pub trait Analytics { // This trait allows using the above implementation dynamically trait DynAnalytics: Send { - fn handle_transaction(&mut self, consumed: &[LedgerSpent], created: &[LedgerOutput], ctx: &dyn AnalyticsContext); + fn handle_transaction( + &mut self, + payload: &SignedTransactionPayload, + consumed: &[LedgerSpent], + created: &[LedgerOutput], + ctx: &dyn AnalyticsContext, + ); fn handle_block(&mut self, block: &Block, metadata: &BlockMetadata, ctx: &dyn AnalyticsContext); fn take_measurement(&mut self, ctx: &dyn AnalyticsContext) -> Box; } @@ -69,8 +82,14 @@ impl DynAnalytics for T where PerSlot: 'static + PrepareQuery, { - fn handle_transaction(&mut self, consumed: &[LedgerSpent], created: &[LedgerOutput], ctx: &dyn AnalyticsContext) { - Analytics::handle_transaction(self, consumed, created, ctx) + fn handle_transaction( + &mut self, + payload: &SignedTransactionPayload, + consumed: &[LedgerSpent], + created: &[LedgerOutput], + ctx: &dyn AnalyticsContext, + ) { + Analytics::handle_transaction(self, payload, consumed, created, ctx) } fn handle_block(&mut self, block: &Block, metadata: &BlockMetadata, ctx: &dyn AnalyticsContext) { @@ -145,17 +164,22 @@ impl Analytic { unspent_outputs: impl IntoIterator, ) -> Self { Self(match choice { + // Need ledger state AnalyticsChoice::AddressBalance => Box::new(AddressBalancesAnalytics::init(unspent_outputs)) as _, - AnalyticsChoice::BaseTokenActivity => Box::::default() as _, - AnalyticsChoice::BlockActivity => Box::::default() as _, - AnalyticsChoice::ActiveAddresses => Box::::default() as _, + AnalyticsChoice::Features => Box::new(FeaturesMeasurement::init(unspent_outputs)) as _, AnalyticsChoice::LedgerOutputs => Box::new(LedgerOutputMeasurement::init(unspent_outputs)) as _, AnalyticsChoice::LedgerSize => Box::new(LedgerSizeAnalytics::init(protocol_params, unspent_outputs)) as _, - AnalyticsChoice::SlotSize => Box::::default() as _, + AnalyticsChoice::UnlockConditions => Box::new(UnlockConditionMeasurement::init(unspent_outputs)) as _, + // Can default + AnalyticsChoice::ActiveAddresses => Box::::default() as _, + AnalyticsChoice::BaseTokenActivity => Box::::default() as _, + AnalyticsChoice::BlockActivity => Box::::default() as _, + AnalyticsChoice::BlockIssuerActivity => Box::::default() as _, + AnalyticsChoice::ManaActivity => Box::::default() as _, AnalyticsChoice::OutputActivity => Box::::default() as _, AnalyticsChoice::ProtocolParameters => Box::::default() as _, + AnalyticsChoice::SlotSize => Box::::default() as _, AnalyticsChoice::TransactionSizeDistribution => Box::::default() as _, - AnalyticsChoice::UnlockConditions => Box::new(UnlockConditionMeasurement::init(unspent_outputs)) as _, }) } } @@ -164,15 +188,21 @@ impl> Analytics for T { type Measurement = Vec>; fn handle_block(&mut self, block: &Block, metadata: &BlockMetadata, ctx: &dyn AnalyticsContext) { - for analytic in self.as_mut().iter_mut() { + self.as_mut().par_iter_mut().for_each(|analytic| { analytic.0.handle_block(block, metadata, ctx); - } + }) } - fn handle_transaction(&mut self, consumed: &[LedgerSpent], created: &[LedgerOutput], ctx: &dyn AnalyticsContext) { - for analytic in self.as_mut().iter_mut() { - analytic.0.handle_transaction(consumed, created, ctx); - } + fn handle_transaction( + &mut self, + payload: &SignedTransactionPayload, + consumed: &[LedgerSpent], + created: &[LedgerOutput], + ctx: &dyn AnalyticsContext, + ) { + self.as_mut().par_iter_mut().for_each(|analytic| { + analytic.0.handle_transaction(payload, consumed, created, ctx); + }) } fn take_measurement(&mut self, ctx: &dyn AnalyticsContext) -> Self::Measurement { @@ -281,7 +311,7 @@ impl<'a, I: InputSource> Slot<'a, I> { .clone()) }) .collect::>>()?; - analytics.handle_transaction(&consumed, &created, ctx) + analytics.handle_transaction(payload, &consumed, &created, ctx) } } analytics.handle_block(block, &block_data.metadata, ctx); diff --git a/src/analytics/tangle/block_issuers.rs b/src/analytics/tangle/block_issuers.rs new file mode 100644 index 000000000..21930c7e8 --- /dev/null +++ b/src/analytics/tangle/block_issuers.rs @@ -0,0 +1,39 @@ +// Copyright 2023 IOTA Stiftung +// SPDX-License-Identifier: Apache-2.0 + +use std::collections::HashSet; + +use iota_sdk::types::block::output::AccountId; + +use crate::analytics::{Analytics, AnalyticsContext}; + +#[derive(Debug, Default)] +pub(crate) struct BlockIssuerMeasurement { + pub(crate) active_issuer_count: usize, +} + +/// Computes the number of block issuers that were active during a given time interval. +#[allow(missing_docs)] +#[derive(Debug, Default)] +pub(crate) struct BlockIssuerAnalytics { + issuer_accounts: HashSet, +} + +impl Analytics for BlockIssuerAnalytics { + type Measurement = BlockIssuerMeasurement; + + fn handle_block( + &mut self, + block: &iota_sdk::types::block::Block, + _metadata: &crate::model::block_metadata::BlockMetadata, + _ctx: &dyn AnalyticsContext, + ) { + self.issuer_accounts.insert(block.issuer_id()); + } + + fn take_measurement(&mut self, _ctx: &dyn AnalyticsContext) -> Self::Measurement { + BlockIssuerMeasurement { + active_issuer_count: std::mem::take(&mut self.issuer_accounts).len(), + } + } +} diff --git a/src/analytics/tangle/mana_activity.rs b/src/analytics/tangle/mana_activity.rs new file mode 100644 index 000000000..f51bf8bb6 --- /dev/null +++ b/src/analytics/tangle/mana_activity.rs @@ -0,0 +1,70 @@ +// Copyright 2023 IOTA Stiftung +// SPDX-License-Identifier: Apache-2.0 + +use iota_sdk::types::block::{ + payload::{signed_transaction::TransactionCapabilityFlag, SignedTransactionPayload}, + protocol::WorkScore, + Block, +}; + +use crate::{ + analytics::{Analytics, AnalyticsContext}, + model::{ + block_metadata::BlockMetadata, + ledger::{LedgerOutput, LedgerSpent}, + }, +}; + +/// The type of payloads that occured within a single slot. +#[derive(Copy, Clone, Debug, Default)] +pub(crate) struct ManaActivityMeasurement { + pub(crate) rewards_claimed: u64, + pub(crate) mana_burned: u64, + pub(crate) bic_burned: u64, +} + +impl Analytics for ManaActivityMeasurement { + type Measurement = Self; + + fn handle_transaction( + &mut self, + payload: &SignedTransactionPayload, + consumed: &[LedgerSpent], + created: &[LedgerOutput], + ctx: &dyn AnalyticsContext, + ) { + if payload + .transaction() + .capabilities() + .has_capability(TransactionCapabilityFlag::BurnMana) + { + // TODO: Add reward mana + let input_mana = consumed + .iter() + .map(|o| { + // Unwrap: acceptable risk + o.output() + .available_mana(ctx.protocol_parameters(), o.output.slot_booked, ctx.slot_index()) + .unwrap() + }) + .sum::(); + let output_mana = created.iter().map(|o| o.output().mana()).sum::() + + payload.transaction().allotments().iter().map(|a| a.mana()).sum::(); + if input_mana > output_mana { + self.mana_burned += input_mana - output_mana; + } + } + } + + fn handle_block(&mut self, block: &Block, _metadata: &BlockMetadata, ctx: &dyn AnalyticsContext) { + // TODO: need RMC from INX + let rmc = 1; + if let Some(body) = block.body().as_basic_opt() { + self.bic_burned += body.work_score(ctx.protocol_parameters().work_score_parameters()) as u64 * rmc; + } + } + + fn take_measurement(&mut self, _ctx: &dyn AnalyticsContext) -> Self::Measurement { + std::mem::take(self) + } +} diff --git a/src/analytics/tangle/mod.rs b/src/analytics/tangle/mod.rs index d1c7a9f7b..89006869d 100644 --- a/src/analytics/tangle/mod.rs +++ b/src/analytics/tangle/mod.rs @@ -4,10 +4,16 @@ //! Statistics about the tangle. pub(crate) use self::{ - block_activity::BlockActivityMeasurement, protocol_params::ProtocolParamsAnalytics, slot_size::SlotSizeMeasurement, + block_activity::BlockActivityMeasurement, + block_issuers::{BlockIssuerAnalytics, BlockIssuerMeasurement}, + mana_activity::ManaActivityMeasurement, + protocol_params::ProtocolParamsAnalytics, + slot_size::SlotSizeMeasurement, }; mod block_activity; +mod block_issuers; +mod mana_activity; mod protocol_params; mod slot_size; diff --git a/src/bin/inx-chronicle/api/explorer/responses.rs b/src/bin/inx-chronicle/api/explorer/responses.rs index cd4eeba2d..5be468cb8 100644 --- a/src/bin/inx-chronicle/api/explorer/responses.rs +++ b/src/bin/inx-chronicle/api/explorer/responses.rs @@ -121,6 +121,7 @@ pub struct BlockPayloadTypeDto { #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct BlocksBySlotResponse { + pub count: usize, pub blocks: Vec, pub cursor: Option, } diff --git a/src/db/influxdb/config.rs b/src/db/influxdb/config.rs index b60775595..902dd769f 100644 --- a/src/db/influxdb/config.rs +++ b/src/db/influxdb/config.rs @@ -75,15 +75,18 @@ impl Default for InfluxDbConfig { #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, clap::ValueEnum)] pub enum AnalyticsChoice { // Please keep the alphabetic order. + ActiveAddresses, AddressBalance, BaseTokenActivity, BlockActivity, - ActiveAddresses, + BlockIssuerActivity, + Features, LedgerOutputs, LedgerSize, - SlotSize, + ManaActivity, OutputActivity, ProtocolParameters, + SlotSize, TransactionSizeDistribution, UnlockConditions, } @@ -92,15 +95,18 @@ pub enum AnalyticsChoice { pub fn all_analytics() -> HashSet { // Please keep the alphabetic order. [ + AnalyticsChoice::ActiveAddresses, AnalyticsChoice::AddressBalance, AnalyticsChoice::BaseTokenActivity, AnalyticsChoice::BlockActivity, - AnalyticsChoice::ActiveAddresses, + AnalyticsChoice::BlockIssuerActivity, + AnalyticsChoice::Features, AnalyticsChoice::LedgerOutputs, AnalyticsChoice::LedgerSize, - AnalyticsChoice::SlotSize, + AnalyticsChoice::ManaActivity, AnalyticsChoice::OutputActivity, AnalyticsChoice::ProtocolParameters, + AnalyticsChoice::SlotSize, AnalyticsChoice::TransactionSizeDistribution, AnalyticsChoice::UnlockConditions, ] diff --git a/src/db/mongodb/collections/outputs/mod.rs b/src/db/mongodb/collections/outputs/mod.rs index 7d4188ec3..82730c7b3 100644 --- a/src/db/mongodb/collections/outputs/mod.rs +++ b/src/db/mongodb/collections/outputs/mod.rs @@ -373,6 +373,16 @@ pub struct UtxoChangesResult { pub consumed_outputs: Vec, } +#[derive(Clone, Debug, Default, Deserialize)] +#[allow(missing_docs)] +pub struct AddressActivityByType { + pub ed25519_count: usize, + pub account_count: usize, + pub nft_count: usize, + pub anchor_count: usize, + pub implicit_count: usize, +} + /// Implements the queries for the core API. impl OutputCollection { /// Upserts spent ledger outputs. @@ -858,10 +868,11 @@ impl OutputCollection { &self, start_date: time::Date, end_date: time::Date, - ) -> Result { + ) -> Result { #[derive(Deserialize)] struct Res { - count: usize, + #[serde(rename = "_id")] + address: AddressDto, } let protocol_params = self @@ -875,34 +886,42 @@ impl OutputCollection { protocol_params.slot_index(end_date.midnight().assume_utc().unix_timestamp() as _), ); - Ok(self - .aggregate::( - [ - doc! { "$match": { "$or": [ - { "metadata.slot_booked": { - "$gte": start_slot.0, - "$lt": end_slot.0 - } }, - { "metadata.spent_metadata.slot_spent": { - "$gte": start_slot.0, - "$lt": end_slot.0 - } }, - ] } }, - doc! { "$group": { - "_id": "$details.address", + let mut res = AddressActivityByType::default(); + + self.aggregate::( + [ + doc! { "$match": { "$or": [ + { "metadata.slot_booked": { + "$gte": start_slot.0, + "$lt": end_slot.0 } }, - doc! { "$group": { - "_id": null, - "count": { "$sum": 1 } + { "metadata.spent_metadata.slot_spent": { + "$gte": start_slot.0, + "$lt": end_slot.0 } }, - ], - None, - ) - .await? - .map_ok(|r| r.count) - .try_next() - .await? - .unwrap_or_default()) + ] } }, + doc! { "$group": { + "_id": "$details.address", + } }, + ], + None, + ) + .await? + .map_ok(|r| r.address) + .try_for_each(|address| async move { + match address { + AddressDto::Ed25519(_) => res.ed25519_count += 1, + AddressDto::Account(_) => res.account_count += 1, + AddressDto::Nft(_) => res.nft_count += 1, + AddressDto::Anchor(_) => res.anchor_count += 1, + AddressDto::ImplicitAccountCreation(_) => res.implicit_count += 1, + _ => (), + } + Ok(()) + }) + .await?; + + Ok(res) } } diff --git a/src/inx/client.rs b/src/inx/client.rs index fac626c08..ef61f0615 100644 --- a/src/inx/client.rs +++ b/src/inx/client.rs @@ -39,15 +39,6 @@ impl Inx { self.inx.read_node_status(proto::NoParams {}).await?.try_convert() } - // /// Stream status updates from the node. - // pub async fn get_node_status_updates( - // &mut self, - // cooldown_in_milliseconds: u32, - // ) -> Result>, InxError> { Ok(self .inx - // .listen_to_node_status(proto::NodeStatusRequest { cooldown_in_milliseconds, }) .await? .into_inner() .map(|msg| - // TryConvertTo::try_convert(msg?))) - // } - /// Get the configuration of the node. pub async fn get_node_configuration(&mut self) -> Result { self.inx @@ -56,15 +47,6 @@ impl Inx { .try_convert() } - // /// Get the active root blocks of the node. - // pub async fn get_active_root_blocks(&mut self) -> Result { - // Ok(self - // .inx - // .read_active_root_blocks(proto::NoParams {}) - // .await? - // .try_convert()?) - // } - /// Get a commitment from a slot index. pub async fn get_commitment(&mut self, slot_index: SlotIndex) -> Result { self.inx