Skip to content

Commit

Permalink
fix address balance analytic
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Coats committed Mar 15, 2024
1 parent 96f00fc commit 43ec81b
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 49 deletions.
57 changes: 40 additions & 17 deletions src/analytics/ledger/address_balance.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2023 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0

use std::collections::HashMap;
use std::collections::{hash_map::Entry, HashMap};

use futures::prelude::stream::TryStreamExt;
use iota_sdk::types::block::{payload::SignedTransactionPayload, protocol::ProtocolParameters, slot::SlotIndex};
Expand Down Expand Up @@ -53,22 +53,21 @@ impl AddressBalancesAnalytics {
/// Initialize the analytics by reading the current ledger state.
pub(crate) async fn init<'a>(
protocol_parameters: &ProtocolParameters,
slot: SlotIndex,
_slot: SlotIndex,
unspent_outputs: impl IntoIterator<Item = &'a LedgerOutput>,
db: &MongoDb,
) -> Result<Self, DbError> {
db.collection::<AddressBalanceCollection>()
.collection()
.drop(None)
.await?;
let mut map = HashMap::new();
let mut balances = HashMap::new();
for output in unspent_outputs {
*map.entry(output.locked_address_at(slot, protocol_parameters))
.or_default() += output.amount();
*balances.entry(output.locked_address(protocol_parameters)).or_default() += output.amount();
}
for (address, balance) in map {
for (address, balance) in balances {
db.collection::<AddressBalanceCollection>()
.add_balance(&address, balance)
.insert_balance(&address, balance)
.await?;
}
Ok(AddressBalancesAnalytics)
Expand All @@ -87,22 +86,46 @@ impl Analytics for AddressBalancesAnalytics {
created: &[LedgerOutput],
ctx: &dyn AnalyticsContext,
) -> eyre::Result<()> {
let mut balances = HashMap::<_, u64>::new();
for output in created {
let address = output.locked_address(ctx.protocol_parameters());
let mut entry = balances.entry(address.clone());
let balance = match entry {
Entry::Occupied(ref mut o) => o.get_mut(),
Entry::Vacant(v) => {
let balance = ctx
.database()
.collection::<AddressBalanceCollection>()
.get_balance(&address)
.await?;
v.insert(balance)
}
};
*balance += output.amount();
}
for output in consumed {
ctx.database()
.collection::<AddressBalanceCollection>()
.remove_balance(
&output.output.locked_address(ctx.protocol_parameters()),
output.amount(),
)
.await?;
let address = output.output.locked_address(ctx.protocol_parameters());
let mut entry = balances.entry(address.clone());
let balance = match entry {
Entry::Occupied(ref mut o) => o.get_mut(),
Entry::Vacant(v) => {
let balance = ctx
.database()
.collection::<AddressBalanceCollection>()
.get_balance(&address)
.await?;
v.insert(balance)
}
};
*balance -= output.amount();
}

for output in created {
for (address, balance) in balances {
ctx.database()
.collection::<AddressBalanceCollection>()
.add_balance(&output.locked_address(ctx.protocol_parameters()), output.amount())
.insert_balance(&address, balance)
.await?;
}

Ok(())
}

Expand Down
35 changes: 3 additions & 32 deletions src/db/mongodb/collections/analytics/address_balance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,46 +89,17 @@ pub struct DistributionStat {
}

impl AddressBalanceCollection {
/// Add an amount of balance to the given address.
pub async fn add_balance(&self, address: &Address, amount: u64) -> Result<(), DbError> {
/// Insert a balance for an address.
pub async fn insert_balance(&self, address: &Address, balance: u64) -> Result<(), DbError> {
self.update_one(
doc! { "_id": AddressDto::from(address) },
vec![doc! { "$set": {
"balance": {
"$toString": { "$add": [
{ "$toDecimal": { "$ifNull": [ "$balance", 0 ] } },
{ "$toDecimal": amount.to_string() }
] }
}
} }],
doc! { "$set": { "balance": balance.to_string() } },
UpdateOptions::builder().upsert(true).build(),
)
.await?;
Ok(())
}

/// Remove an amount of balance from the given address.
pub async fn remove_balance(&self, address: &Address, amount: u64) -> Result<(), DbError> {
let address_dto = AddressDto::from(address);
self.update_one(
doc! { "_id": &address_dto },
vec![doc! { "$set": {
"balance": {
"$toString": { "$subtract": [
{ "$toDecimal": { "$ifNull": [ "$balance", 0 ] } },
{ "$toDecimal": amount.to_string() }
] }
}
} }],
None,
)
.await?;
if self.get_balance(address).await? == 0 {
self.collection().delete_one(doc! { "_id": address_dto }, None).await?;
}
Ok(())
}

/// Get the balance of an address.
pub async fn get_balance(&self, address: &Address) -> Result<u64, DbError> {
Ok(self
Expand Down

0 comments on commit 43ec81b

Please sign in to comment.