Skip to content

Commit

Permalink
Add candidates collection and fix address balances. Update analytics …
Browse files Browse the repository at this point in the history
…dashboard.
  • Loading branch information
Alex Coats committed Mar 1, 2024
1 parent 7752b05 commit 83e7295
Show file tree
Hide file tree
Showing 12 changed files with 5,826 additions and 2,358 deletions.
7,509 changes: 5,367 additions & 2,142 deletions docker/assets/grafana/dashboards/analytics_dashboard.json

Large diffs are not rendered by default.

79 changes: 69 additions & 10 deletions src/analytics/ledger/features.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
// Copyright 2023 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0

use futures::prelude::stream::StreamExt;
use iota_sdk::{
types::block::{
output::{
feature::{NativeTokenFeature, StakingFeature},
Feature,
AccountId, Feature,
},
payload::SignedTransactionPayload,
Block,
},
utils::serde::string,
U256,
Expand All @@ -17,7 +19,11 @@ use serde::{Deserialize, Serialize};
use super::CountAndAmount;
use crate::{
analytics::{Analytics, AnalyticsContext},
model::ledger::{LedgerOutput, LedgerSpent},
db::{mongodb::collections::AccountCandidacyCollection, MongoDb},
model::{
block_metadata::BlockMetadata,
ledger::{LedgerOutput, LedgerSpent},
},
};

#[derive(Copy, Clone, Debug, Default, Serialize, Deserialize)]
Expand All @@ -42,21 +48,33 @@ impl FeaturesMeasurement {
}

/// Initialize the analytics by reading the current ledger state.
pub(crate) fn init<'a>(unspent_outputs: impl IntoIterator<Item = &'a LedgerOutput>) -> Self {
pub(crate) async fn init<'a>(
unspent_outputs: impl IntoIterator<Item = &'a LedgerOutput>,
db: &MongoDb,
) -> eyre::Result<Self> {
let mut measurement = Self::default();
for output in unspent_outputs {
if let Some(features) = output.output().features() {
for feature in features.iter() {
match feature {
Feature::NativeToken(nt) => measurement.native_tokens.add_native_token(nt),
Feature::BlockIssuer(_) => measurement.block_issuer.add_output(output),
Feature::Staking(staking) => measurement.staking.add_staking(staking),
Feature::Staking(staking) => {
measurement
.staking
.add_staking(
output.output().as_account().account_id_non_null(&output.output_id()),
staking,
db,
)
.await?
}
_ => (),
}
}
}
}
measurement
Ok(measurement)
}
}

Expand All @@ -69,18 +87,52 @@ impl Analytics for FeaturesMeasurement {
_payload: &SignedTransactionPayload,
consumed: &[LedgerSpent],
created: &[LedgerOutput],
_ctx: &dyn AnalyticsContext,
ctx: &dyn AnalyticsContext,
) -> eyre::Result<()> {
let consumed = Self::init(consumed.iter().map(|input| &input.output));
let created = Self::init(created);
let consumed = consumed.iter().map(|input| &input.output).collect::<Vec<_>>();
let consumed = Self::init(consumed, ctx.database()).await?;
let created = Self::init(created, ctx.database()).await?;

self.wrapping_add(created);
self.wrapping_sub(consumed);

Ok(())
}

async fn take_measurement(&mut self, _ctx: &dyn AnalyticsContext) -> eyre::Result<Self::Measurement> {
async fn handle_block(
&mut self,
block: &Block,
_metadata: &BlockMetadata,
ctx: &dyn AnalyticsContext,
) -> eyre::Result<()> {
if block
.body()
.as_basic_opt()
.and_then(|body| body.payload())
.map_or(false, |payload| payload.is_candidacy_announcement())
{
ctx.database()
.collection::<AccountCandidacyCollection>()
.add_candidacy_slot(&block.issuer_id(), ctx.slot_index())
.await?;
}
Ok(())
}

async fn take_measurement(&mut self, ctx: &dyn AnalyticsContext) -> eyre::Result<Self::Measurement> {
self.staking.candidate_count = ctx
.database()
.collection::<AccountCandidacyCollection>()
.get_candidates(ctx.epoch_index(), ctx.protocol_parameters())
.await?
.count()
.await;
if ctx.slot_index() == ctx.protocol_parameters().first_slot_of(ctx.epoch_index()) {
ctx.database()
.collection::<AccountCandidacyCollection>()
.clear_expired_data(ctx.epoch_index(), ctx.protocol_parameters())
.await?;
}
Ok(*self)
}
}
Expand Down Expand Up @@ -116,6 +168,7 @@ impl NativeTokensCountAndAmount {
#[derive(Copy, Clone, Debug, Default, Serialize, Deserialize)]
pub(crate) struct StakingCountAndAmount {
pub(crate) count: usize,
pub(crate) candidate_count: usize,
#[serde(with = "string")]
pub(crate) staked_amount: u64,
}
Expand All @@ -124,19 +177,25 @@ impl StakingCountAndAmount {
fn wrapping_add(&mut self, rhs: Self) {
*self = Self {
count: self.count.wrapping_add(rhs.count),
candidate_count: self.candidate_count.wrapping_add(rhs.count),
staked_amount: self.staked_amount.wrapping_add(rhs.staked_amount),
}
}

fn wrapping_sub(&mut self, rhs: Self) {
*self = Self {
count: self.count.wrapping_sub(rhs.count),
candidate_count: self.candidate_count.wrapping_sub(rhs.count),
staked_amount: self.staked_amount.wrapping_sub(rhs.staked_amount),
}
}

fn add_staking(&mut self, staking: &StakingFeature) {
async fn add_staking(&mut self, account_id: AccountId, staking: &StakingFeature, db: &MongoDb) -> eyre::Result<()> {
self.count += 1;
self.staked_amount += staking.staked_amount();
db.collection::<AccountCandidacyCollection>()
.add_staking_account(&account_id, staking.start_epoch(), staking.end_epoch())
.await?;
Ok(())
}
}
8 changes: 6 additions & 2 deletions src/analytics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use iota_sdk::types::block::{
output::OutputId,
payload::SignedTransactionPayload,
protocol::ProtocolParameters,
slot::{SlotCommitment, SlotIndex},
slot::{EpochIndex, SlotCommitment, SlotIndex},
Block,
};
use thiserror::Error;
Expand Down Expand Up @@ -50,6 +50,10 @@ pub trait AnalyticsContext: Send + Sync {
self.slot_commitment().slot()
}

fn epoch_index(&self) -> EpochIndex {
self.protocol_parameters().epoch_index_of(self.slot_commitment().slot())
}

fn slot_commitment(&self) -> &SlotCommitment;

fn database(&self) -> &MongoDb;
Expand Down Expand Up @@ -200,7 +204,7 @@ impl Analytic {
AnalyticsChoice::AddressBalance => {
Box::new(AddressBalancesAnalytics::init(protocol_params, slot, unspent_outputs, db).await?) as _
}
AnalyticsChoice::Features => Box::new(FeaturesMeasurement::init(unspent_outputs)) as _,
AnalyticsChoice::Features => Box::new(FeaturesMeasurement::init(unspent_outputs, db).await?) as _,
AnalyticsChoice::LedgerOutputs => Box::new(LedgerOutputMeasurement::init(unspent_outputs)) as _,
AnalyticsChoice::LedgerSize => Box::new(LedgerSizeAnalytics::init(protocol_params, unspent_outputs)) as _,
AnalyticsChoice::UnlockConditions => Box::new(UnlockConditionMeasurement::init(unspent_outputs)) as _,
Expand Down
2 changes: 1 addition & 1 deletion src/bin/inx-chronicle/cli/analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ pub async fn fill_analytics<I: 'static + InputSource + Clone>(
// getting the previous slot data.
let ledger_state = if slot.index().0 > 0 {
db.collection::<OutputCollection>()
.get_unspent_output_stream(slot.index() - 1)
.get_unspent_output_stream(slot.index().0.saturating_sub(1).into())
.await?
.try_collect::<Vec<_>>()
.await?
Expand Down
2 changes: 1 addition & 1 deletion src/bin/inx-chronicle/inx/influx/analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl InxWorker {
let ledger_state = self
.db
.collection::<OutputCollection>()
.get_unspent_output_stream(slot.index() - 1)
.get_unspent_output_stream(slot.index().0.saturating_sub(1).into())
.await?
.try_collect::<Vec<_>>()
.await?;
Expand Down
5 changes: 4 additions & 1 deletion src/bin/inx-chronicle/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,10 @@ async fn build_indexes(db: &MongoDb) -> eyre::Result<()> {
db.create_indexes::<collections::LedgerUpdateCollection>().await?;
db.create_indexes::<collections::CommittedSlotCollection>().await?;
#[cfg(feature = "analytics")]
db.create_indexes::<collections::AddressBalanceCollection>().await?;
{
db.create_indexes::<collections::AddressBalanceCollection>().await?;
db.create_indexes::<collections::AccountCandidacyCollection>().await?;
}
let end_indexes = db.get_index_names().await?;
for (collection, indexes) in end_indexes {
if let Some(old_indexes) = start_indexes.get(&collection) {
Expand Down
169 changes: 169 additions & 0 deletions src/db/mongodb/collections/analytics/account_candidacy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
// Copyright 2024 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0

use futures::{prelude::stream::TryStreamExt, Stream};
use iota_sdk::types::block::{
output::AccountId,
protocol::ProtocolParameters,
slot::{EpochIndex, SlotIndex},
};
use mongodb::{
bson::doc,
options::{IndexOptions, UpdateOptions},
IndexModel,
};
use serde::{Deserialize, Serialize};

use crate::{
db::{mongodb::DbError, MongoDb, MongoDbCollection, MongoDbCollectionExt},
model::SerializeToBson,
};

/// The MongoDb document representation of address balances.
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct AccountCandidacyDocument {
#[serde(rename = "_id")]
pub account_id: AccountId,
pub staking_start_epoch: EpochIndex,
pub staking_end_epoch: EpochIndex,
pub candidacy_slots: Option<Vec<SlotIndex>>,
}

/// A collection to store analytics address balances.
pub struct AccountCandidacyCollection {
collection: mongodb::Collection<AccountCandidacyDocument>,
}

#[async_trait::async_trait]
impl MongoDbCollection for AccountCandidacyCollection {
const NAME: &'static str = "analytics_candidacy_announcement";
type Document = AccountCandidacyDocument;

fn instantiate(_db: &MongoDb, collection: mongodb::Collection<Self::Document>) -> Self {
Self { collection }
}

fn collection(&self) -> &mongodb::Collection<Self::Document> {
&self.collection
}

async fn create_indexes(&self) -> Result<(), DbError> {
self.create_index(
IndexModel::builder()
.keys(doc! { "staking_end_epoch": 1, "staking_start_epoch": 1 })
.options(
IndexOptions::builder()
.name("candidate_index".to_string())
.partial_filter_expression(doc! {
"candidacy_slot": { "$exists": true },
})
.build(),
)
.build(),
None,
)
.await?;

Ok(())
}
}

impl AccountCandidacyCollection {
/// Add an account with a staking epoch range.
pub async fn add_staking_account(
&self,
account_id: &AccountId,
EpochIndex(staking_start_epoch): EpochIndex,
EpochIndex(staking_end_epoch): EpochIndex,
) -> Result<(), DbError> {
self.update_one(
doc! { "_id": account_id.to_bson() },
doc! { "$set": {
"staking_start_epoch": staking_start_epoch,
"staking_end_epoch": staking_end_epoch,
} },
UpdateOptions::builder().upsert(true).build(),
)
.await?;
Ok(())
}

/// Add a candidacy announcement slot to an account.
pub async fn add_candidacy_slot(
&self,
account_id: &AccountId,
SlotIndex(candidacy_slot): SlotIndex,
) -> Result<(), DbError> {
self.update_many(
doc! {
"_id.account_id": account_id.to_bson(),
},
doc! { "$addToSet": {
"candidacy_slots": candidacy_slot,
} },
None,
)
.await?;
Ok(())
}

/// Get all candidates at the candidate epoch.
pub async fn get_candidates(
&self,
EpochIndex(candidate_epoch): EpochIndex,
protocol_parameters: &ProtocolParameters,
) -> Result<impl Stream<Item = Result<AccountId, DbError>>, DbError> {
let SlotIndex(start_slot) = protocol_parameters.first_slot_of(candidate_epoch.saturating_sub(1));
let SlotIndex(registration_slot) = protocol_parameters.registration_slot(candidate_epoch.into());
Ok(self
.find::<AccountCandidacyDocument>(
doc! {
"staking_start_epoch": { "$lte": candidate_epoch },
"staking_end_epoch": { "$gte": candidate_epoch },
"candidacy_slots": { "$exists": true },
"candidacy_slots": {
"$elemMatch": {
"$gte": start_slot,
"$lte": registration_slot,
}
},
},
None,
)
.await?
.map_err(Into::into)
.map_ok(|doc| doc.account_id))
}

/// Clears data that is outside of the range implied by the candidate epoch.
pub async fn clear_expired_data(
&self,
EpochIndex(candidate_epoch): EpochIndex,
protocol_parameters: &ProtocolParameters,
) -> Result<(), DbError> {
let SlotIndex(start_slot) = protocol_parameters.first_slot_of(candidate_epoch.saturating_sub(1));
self.collection()
.delete_many(
doc! {
"staking_end_epoch": { "$lt": candidate_epoch },
},
None,
)
.await?;
self.update_many(
doc! {
"staking_start_epoch": { "$lte": candidate_epoch },
"staking_end_epoch": { "$gte": candidate_epoch },
"candidacy_slots": { "$exists": true },
},
doc! {
"$pull": { "candidacy_slots": {
"$lt": start_slot,
} }
},
None,
)
.await?;
Ok(())
}
}
Loading

0 comments on commit 83e7295

Please sign in to comment.