Skip to content

Commit

Permalink
finish impl
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex6323 committed Nov 10, 2023
1 parent b1762a3 commit 5a6e3d0
Show file tree
Hide file tree
Showing 10 changed files with 117 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ async fn main() -> Result<()> {
outputs.iter().enumerate().for_each(|(i, output_data)| {
println!("OUTPUT #{i}");
println!(
"- address: {:?}\n- amount: {:?}\n- native tokens: {:?}",
output_data.address.clone().to_bech32_unchecked("rms"),
"- amount: {:?}\n- native tokens: {:?}",
output_data.output.amount(),
output_data.output.native_tokens()
)
Expand Down Expand Up @@ -83,8 +82,7 @@ async fn main() -> Result<()> {
outputs.iter().enumerate().for_each(|(i, output_data)| {
println!("OUTPUT #{i}");
println!(
"- address: {:?}\n- amount: {:?}\n- native tokens: {:?}",
output_data.address.clone().to_bech32_unchecked("rms"),
"- amount: {:?}\n- native tokens: {:?}",
output_data.output.amount(),
output_data.output.native_tokens()
)
Expand Down
2 changes: 1 addition & 1 deletion sdk/src/types/block/address/multi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub struct WeightedAddress {

impl WeightedAddress {
/// Creates a new [`WeightedAddress`].
pub fn new(address: Address, weight: u8) -> Result<WeightedAddress, Error> {
pub fn new(address: Address, weight: u8) -> Result<Self, Error> {
verify_address::<true>(&address, &())?;
verify_weight::<true>(&weight, &())?;

Expand Down
36 changes: 18 additions & 18 deletions sdk/src/types/block/capabilities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,29 +294,29 @@ mod test {

fn as_byte(&self) -> u8 {
match self {
TestFlag::Val1 => Self::VAL_1,
TestFlag::Val2 => Self::VAL_2,
TestFlag::Val3 => Self::VAL_3,
TestFlag::Val4 => Self::VAL_4,
TestFlag::Val5 => Self::VAL_5,
TestFlag::Val6 => Self::VAL_6,
TestFlag::Val7 => Self::VAL_7,
TestFlag::Val8 => Self::VAL_8,
TestFlag::Val9 => Self::VAL_9,
Self::Val1 => Self::VAL_1,
Self::Val2 => Self::VAL_2,
Self::Val3 => Self::VAL_3,
Self::Val4 => Self::VAL_4,
Self::Val5 => Self::VAL_5,
Self::Val6 => Self::VAL_6,
Self::Val7 => Self::VAL_7,
Self::Val8 => Self::VAL_8,
Self::Val9 => Self::VAL_9,
}
}

fn index(&self) -> usize {
match self {
TestFlag::Val1
| TestFlag::Val2
| TestFlag::Val3
| TestFlag::Val4
| TestFlag::Val5
| TestFlag::Val6
| TestFlag::Val7
| TestFlag::Val8 => 0,
TestFlag::Val9 => 1,
Self::Val1
| Self::Val2
| Self::Val3
| Self::Val4
| Self::Val5
| Self::Val6
| Self::Val7
| Self::Val8 => 0,
Self::Val9 => 1,
}
}

Expand Down
2 changes: 1 addition & 1 deletion sdk/src/types/block/output/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ impl Output {

/// Checks whether the output is an implicit account.
pub fn is_implicit_account(&self) -> bool {
if let Output::Basic(output) = self {
if let Self::Basic(output) = self {
output.is_implicit_account()
} else {
false
Expand Down
6 changes: 3 additions & 3 deletions sdk/src/wallet/operations/output_consolidation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ where
let mut outputs_to_consolidate = Vec::new();
let wallet_data = self.data().await;

let wallet_address = &wallet_data.address;
let wallet_address = wallet_data.address.clone();

for (output_id, output_data) in &wallet_data.unspent_outputs {
#[cfg(feature = "participation")]
Expand All @@ -143,7 +143,7 @@ where
}

let is_locked_output = wallet_data.locked_outputs.contains(output_id);
let should_consolidate_output = self.should_consolidate_output(output_data, slot_index, wallet_address)?;
let should_consolidate_output = self.should_consolidate_output(output_data, slot_index, &wallet_address)?;
if !is_locked_output && should_consolidate_output {
outputs_to_consolidate.push(output_data.clone());
}
Expand Down Expand Up @@ -254,7 +254,7 @@ where
params
.target_address
.map(|bech32| bech32.into_inner())
.unwrap_or_else(|| outputs_to_consolidate[0].address.clone()),
.unwrap_or_else(|| wallet_address.into_inner())
))
.with_native_tokens(total_native_tokens.finish()?)
.finish_output()?];
Expand Down
28 changes: 16 additions & 12 deletions sdk/src/wallet/operations/syncing/addresses/outputs.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
// Copyright 2022 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0

use std::collections::HashMap;

use instant::Instant;

use crate::{
client::secret::SecretManage,
types::block::address::Address,
wallet::{
constants::PARALLEL_REQUESTS_AMOUNT,
task,
Expand All @@ -22,44 +25,45 @@ where
pub(crate) async fn get_outputs_from_address_output_ids(
&self,
addresses_with_unspent_outputs: Vec<AddressWithUnspentOutputs>,
) -> crate::wallet::Result<(Vec<AddressWithUnspentOutputs>, Vec<OutputData>)> {
) -> crate::wallet::Result<(Vec<(AddressWithUnspentOutputs, Vec<OutputData>)>)> {
log::debug!("[SYNC] start get_outputs_from_address_output_ids");
let address_outputs_start_time = Instant::now();

let mut addresses_with_outputs = Vec::new();
let mut outputs_data = Vec::new();

// We split the addresses into chunks so we don't get timeouts if we have thousands
for addresses_chunk in &mut addresses_with_unspent_outputs
.chunks(PARALLEL_REQUESTS_AMOUNT)
.map(|x: &[AddressWithUnspentOutputs]| x.to_vec())
{
let mut tasks = Vec::new();
for address in addresses_chunk {
for address_with_unspent_outputs in addresses_chunk {
let wallet = self.clone();
tasks.push(async move {
task::spawn(async move {
let output_responses = wallet.get_outputs(address.output_ids.clone()).await?;

let outputs = wallet
.output_response_to_output_data(output_responses, &address)
let unspent_outputs_with_metadata = wallet
.get_outputs(address_with_unspent_outputs.output_ids.clone())
.await?;
let unspent_outputs_data = wallet
.output_response_to_output_data(
unspent_outputs_with_metadata,
&address_with_unspent_outputs,
)
.await?;
crate::wallet::Result::Ok((address, outputs))
crate::wallet::Result::Ok((address_with_unspent_outputs, unspent_outputs_data))
})
.await
});
}
let results = futures::future::try_join_all(tasks).await?;
for res in results {
let (address, outputs): (AddressWithUnspentOutputs, Vec<OutputData>) = res?;
addresses_with_outputs.push(address);
outputs_data.extend(outputs);
addresses_with_outputs.push(res?);
}
}
log::debug!(
"[SYNC] finished get_outputs_from_address_output_ids in {:.2?}",
address_outputs_start_time.elapsed()
);
Ok((addresses_with_outputs, outputs_data))
Ok(addresses_with_outputs)
}
}
156 changes: 75 additions & 81 deletions sdk/src/wallet/operations/syncing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,8 @@ where

let address_to_sync = vec![wallet_address_with_unspent_outputs];

let (addresses_with_unspent_outputs, spent_or_not_synced_output_ids, outputs_data): (
Vec<AddressWithUnspentOutputs>,
Vec<OutputId>,
Vec<OutputData>,
) = self.request_outputs_recursively(address_to_sync, options).await?;
let (addresses_with_unspent_outputs, spent_or_not_synced_output_ids, outputs_data) =
self.request_outputs_recursively(address_to_sync, options).await?;

// Request possible spent outputs
log::debug!("[SYNC] spent_or_not_synced_outputs: {spent_or_not_synced_output_ids:?}");
Expand Down Expand Up @@ -167,103 +164,100 @@ where
&self,
addresses_to_sync: Vec<AddressWithUnspentOutputs>,
options: &SyncOptions,
// ) -> crate::wallet::Result<(Vec<AddressWithUnspentOutputs>, Vec<OutputId>, Vec<OutputData>)> {
) -> crate::wallet::Result<(Vec<AddressWithUnspentOutputs>, Vec<OutputId>, Vec<OutputData>)> {
// Cache the account and nft address with the related ed2559 address, so we can update the account address with
// the new output ids

let mut new_account_and_nft_addresses: HashMap<Address, Address> = HashMap::new();
let mut spent_or_not_synced_output_ids = Vec::new();
let mut addresses_with_unspent_outputs = Vec::new();
let mut outputs_data = Vec::new();
// Cache account and nft addresses with the related Ed25519 address, so we can update the account
// address with the new output ids.
let mut new_addresses: HashMap<Address, Address> = HashMap::new();
let mut addresses_with_unspent_output_ids_all = Vec::new();
let mut outputs_data_all = Vec::new();

let bech32_hrp = self.client().get_bech32_hrp().await?;

loop {
let new_outputs_data = if new_account_and_nft_addresses.is_empty() {
// Get outputs for the addresses and add them also the the addresses_with_unspent_outputs
let (unspent_output_ids, spent_or_not_synced_output_ids_inner) = self
.get_output_ids_for_addresses(addresses_to_sync.clone(), options)
.await?;

spent_or_not_synced_output_ids = spent_or_not_synced_output_ids_inner;

// Get outputs for addresses and add them also the the addresses_with_unspent_outputs
let (addresses_with_unspent_outputs_inner, outputs_data_inner) =
self.get_outputs_from_address_output_ids(unspent_output_ids).await?;

addresses_with_unspent_outputs = addresses_with_unspent_outputs_inner;
outputs_data.extend(outputs_data_inner.clone());
outputs_data_inner
} else {
let mut new_outputs_data = Vec::new();
for (account_or_nft_address, output_address) in &new_account_and_nft_addresses {
let output_ids = self
.get_output_ids_for_address(
&Bech32Address::new(bech32_hrp, account_or_nft_address.clone()),
options,
)
.await?;

// Update address with unspent outputs
let address_with_unspent_outputs = addresses_with_unspent_outputs
.iter_mut()
.find(|address| address.address.inner() == output_address)
.ok_or_else(|| {
crate::wallet::Error::WalletAddressMismatch(output_address.clone().to_bech32(bech32_hrp))
})?;
address_with_unspent_outputs.output_ids.extend(output_ids.clone());

let new_outputs_data_inner = self.get_outputs(output_ids).await?;

let outputs_data_inner = self
.output_response_to_output_data(new_outputs_data_inner, &address_with_unspent_outputs)
.await?;

outputs_data.extend(outputs_data_inner.clone());
new_outputs_data.extend(outputs_data_inner);
}
new_outputs_data
};

// Clear, so we only get new addresses
new_account_and_nft_addresses.clear();

// Add new account and nft addresses
for output_data in new_outputs_data {
match output_data.output {
Output::Account(account_output) => {
let account_address =
AccountAddress::from(account_output.account_id_non_null(&output_data.output_id));
// Get the unspent and spent/not-synced output ids per address to sync
let (addresses_to_sync_with_unspent_output_ids, mut spent_or_not_synced_output_ids) = self
.get_output_ids_for_addresses(addresses_to_sync.clone(), options)
.await?;

new_account_and_nft_addresses.insert(Address::Account(account_address), output_data.address);
}
Output::Nft(nft_output) => {
let nft_address = NftAddress::from(nft_output.nft_id_non_null(&output_data.output_id));
// Get the corresponding unspent output data
let mut new_outputs_data = self
.get_outputs_from_address_output_ids(addresses_to_sync_with_unspent_output_ids)
.await?;

new_account_and_nft_addresses.insert(Address::Nft(nft_address), output_data.address);
loop {
// Try to discover new addresses
// See https://github.com/rust-lang/rust-clippy/issues/8539 regarding this lint.
#[allow(clippy::iter_with_drain)]
for (associated_address, outputs_data) in new_outputs_data.drain(..) {
for output_data in &outputs_data {
match &output_data.output {
Output::Account(account_output) => {
let account_address =
AccountAddress::from(account_output.account_id_non_null(&output_data.output_id));

new_addresses.insert(
Address::Account(account_address),
associated_address.address.inner().clone(),
);
}
Output::Nft(nft_output) => {
let nft_address = NftAddress::from(nft_output.nft_id_non_null(&output_data.output_id));

new_addresses
.insert(Address::Nft(nft_address), associated_address.address.inner().clone());
}
_ => {}
}
_ => {}
}
addresses_with_unspent_output_ids_all.push(associated_address);
outputs_data_all.extend(outputs_data);
}

log::debug!("[SYNC] new_account_and_nft_addresses: {new_account_and_nft_addresses:?}");
if new_account_and_nft_addresses.is_empty() {
log::debug!("[SYNC] new_addresses: {new_addresses:?}");

// If there are no new addresses abort
if new_addresses.is_empty() {
break;
}

// Get the unspent outputs of the new addresses
for (address, output_address) in new_addresses.drain() {
let address_output_ids = self
.get_output_ids_for_address(
&Bech32Address::new(bech32_hrp, address),
options,
)
.await?;

// Update address with new associated unspent outputs
let address_with_unspent_output_ids = addresses_with_unspent_output_ids_all
.iter_mut()
.find(|address| address.address.inner() == &output_address)
.ok_or_else(|| {
crate::wallet::Error::WalletAddressMismatch(output_address.clone().to_bech32(bech32_hrp))
})?;
address_with_unspent_output_ids.output_ids.extend(address_output_ids.clone());

let address_outputs = self.get_outputs(address_output_ids).await?;
let address_outputs_data = self
.output_response_to_output_data(address_outputs, &address_with_unspent_output_ids)
.await?;

new_outputs_data.push((address_with_unspent_output_ids.clone(), address_outputs_data));
}
}

// get_output_ids_for_addresses() will return recursively owned outputs not anymore, sine they will only get
// synced afterwards, so we filter these unspent outputs here. Maybe the spent_or_not_synced_output_ids can be
// calculated more efficient in the future, by comparing the new and old outputs only at this point. Then this
// retain isn't needed anymore.

let unspent_output_ids: HashSet<OutputId> = HashSet::from_iter(outputs_data.iter().map(|o| o.output_id));
spent_or_not_synced_output_ids.retain(|o| !unspent_output_ids.contains(o));
let unspent_output_ids_all: HashSet<OutputId> = HashSet::from_iter(outputs_data_all.iter().map(|o| o.output_id));
spent_or_not_synced_output_ids.retain(|o| !unspent_output_ids_all.contains(o));

Ok((
addresses_with_unspent_outputs,
addresses_with_unspent_output_ids_all,
spent_or_not_synced_output_ids,
outputs_data,
outputs_data_all,
))
}
}
1 change: 0 additions & 1 deletion sdk/src/wallet/operations/syncing/outputs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ where
metadata: *output_with_meta.metadata(),
output: output_with_meta.output().clone(),
is_spent: output_with_meta.metadata().is_spent(),
address: associated_address.address.inner.clone(),
network_id,
remainder,
chain,
Expand Down
Loading

0 comments on commit 5a6e3d0

Please sign in to comment.