diff --git a/sdk/src/wallet/core/builder.rs b/sdk/src/wallet/core/builder.rs index c9aa3e7e23..23eca5fe22 100644 --- a/sdk/src/wallet/core/builder.rs +++ b/sdk/src/wallet/core/builder.rs @@ -3,7 +3,7 @@ #[cfg(feature = "storage")] use std::collections::HashSet; -use std::sync::{atomic::AtomicUsize, Arc}; +use std::sync::Arc; use serde::Serialize; use tokio::sync::{Mutex, RwLock}; @@ -19,7 +19,7 @@ use crate::{ client::secret::{GenerateAddressOptions, SecretManage, SecretManager}, types::block::address::{Address, Bech32Address}, wallet::{ - core::{Bip44, WalletData, WalletInner}, + core::{operations::background_syncing::BackgroundSyncStatus, Bip44, WalletData, WalletInner}, operations::syncing::SyncOptions, ClientOptions, Wallet, }, @@ -242,11 +242,14 @@ where .finish() .await?; + let background_syncing_status = tokio::sync::watch::channel(BackgroundSyncStatus::Stopped); + let background_syncing_status = (Arc::new(background_syncing_status.0), background_syncing_status.1); + // Build the wallet. let wallet_inner = WalletInner { default_sync_options: Mutex::new(SyncOptions::default()), last_synced: Mutex::new(0), - background_syncing_status: AtomicUsize::new(0), + background_syncing_status, client, secret_manager: self.secret_manager.expect("make WalletInner::secret_manager optional?"), #[cfg(feature = "events")] diff --git a/sdk/src/wallet/core/mod.rs b/sdk/src/wallet/core/mod.rs index 88f0e3bc2a..16db6cdf27 100644 --- a/sdk/src/wallet/core/mod.rs +++ b/sdk/src/wallet/core/mod.rs @@ -6,7 +6,7 @@ pub(crate) mod operations; use std::{ collections::{HashMap, HashSet}, - sync::{atomic::AtomicUsize, Arc}, + sync::Arc, }; use crypto::keys::{ @@ -17,6 +17,7 @@ use serde::{Deserialize, Serialize}; use tokio::sync::{Mutex, RwLock}; pub use self::builder::WalletBuilder; +use self::operations::background_syncing::BackgroundSyncStatus; use super::types::{TransactionWithMetadata, TransactionWithMetadataDto}; #[cfg(feature = "events")] use crate::wallet::events::{ @@ -84,8 +85,10 @@ pub struct WalletInner { // again, because sending transactions can change that pub(crate) last_synced: Mutex, pub(crate) default_sync_options: Mutex, - // 0 = not running, 1 = running, 2 = stopping - pub(crate) background_syncing_status: AtomicUsize, + pub(crate) background_syncing_status: ( + Arc>, + tokio::sync::watch::Receiver, + ), pub(crate) client: Client, // TODO: make this optional? pub(crate) secret_manager: Arc>, diff --git a/sdk/src/wallet/core/operations/background_syncing.rs b/sdk/src/wallet/core/operations/background_syncing.rs index 9bea901acc..7328441fef 100644 --- a/sdk/src/wallet/core/operations/background_syncing.rs +++ b/sdk/src/wallet/core/operations/background_syncing.rs @@ -1,18 +1,25 @@ // Copyright 2021 IOTA Stiftung // SPDX-License-Identifier: Apache-2.0 -use std::{sync::atomic::Ordering, time::Duration}; +use std::time::Duration; -use tokio::time::sleep; +use tokio::time::timeout; use crate::{ client::secret::SecretManage, - wallet::{operations::syncing::SyncOptions, Wallet}, + wallet::{operations::syncing::SyncOptions, task, Wallet}, }; /// The default interval for background syncing pub(crate) const DEFAULT_BACKGROUNDSYNCING_INTERVAL: Duration = Duration::from_secs(7); +#[derive(Clone, PartialEq, Debug)] +pub(crate) enum BackgroundSyncStatus { + Stopped, + Running, + Stopping, +} + impl Wallet where crate::wallet::Error: From, @@ -25,49 +32,49 @@ where interval: Option, ) -> crate::wallet::Result<()> { log::debug!("[start_background_syncing]"); + + let (tx_background_sync, mut rx_background_sync) = self.background_syncing_status.clone(); + // stop existing process if running - if self.background_syncing_status.load(Ordering::Relaxed) == 1 { - self.background_syncing_status.store(2, Ordering::Relaxed); - }; - while self.background_syncing_status.load(Ordering::Relaxed) == 2 { - log::debug!("[background_syncing]: waiting for the old process to stop"); - sleep(Duration::from_secs(1)).await; + if *rx_background_sync.borrow() == BackgroundSyncStatus::Running { + tx_background_sync.send(BackgroundSyncStatus::Stopping).ok(); } - self.background_syncing_status.store(1, Ordering::Relaxed); + log::debug!("[background_syncing]: waiting for the old process to stop"); + rx_background_sync + .wait_for(|status| *status != BackgroundSyncStatus::Stopping) + .await + .ok(); + + tx_background_sync.send(BackgroundSyncStatus::Running).ok(); + let wallet = self.clone(); - let _background_syncing = std::thread::spawn(move || { - #[cfg(not(target_family = "wasm"))] - let runtime = tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build() - .unwrap(); - #[cfg(target_family = "wasm")] - let runtime = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - runtime.block_on(async { - 'outer: loop { - log::debug!("[background_syncing]: syncing wallet"); - - if let Err(err) = wallet.sync(options.clone()).await { - log::debug!("[background_syncing] error: {}", err) - } - - // split interval syncing to seconds so stopping the process doesn't have to wait long - let seconds = interval.unwrap_or(DEFAULT_BACKGROUNDSYNCING_INTERVAL).as_secs(); - for _ in 0..seconds { - if wallet.background_syncing_status.load(Ordering::Relaxed) == 2 { - log::debug!("[background_syncing]: stopping"); - break 'outer; - } - sleep(Duration::from_secs(1)).await; - } + let interval_seconds = interval.unwrap_or(DEFAULT_BACKGROUNDSYNCING_INTERVAL); + + task::spawn(async move { + loop { + log::debug!("[background_syncing]: syncing wallet"); + + if let Err(err) = wallet.sync(options.clone()).await { + log::debug!("[background_syncing] error: {}", err) } - wallet.background_syncing_status.store(0, Ordering::Relaxed); - log::debug!("[background_syncing]: stopped"); - }); + + let res = timeout(interval_seconds, async { + rx_background_sync + .wait_for(|status| *status == BackgroundSyncStatus::Stopping) + .await + .is_ok() + }) + .await; + + // If true it means rx_background_sync changed to BackgroundSyncStatus::Stopping + if Ok(true) == res { + log::debug!("[background_syncing]: stopping"); + break; + } + } + tx_background_sync.send(BackgroundSyncStatus::Stopped).ok(); + log::debug!("[background_syncing]: stopped"); }); Ok(()) } @@ -75,25 +82,32 @@ where /// Request to stop the background syncing of the wallet pub fn request_stop_background_syncing(&self) { log::debug!("[request_stop_background_syncing]"); - self.background_syncing_status.store(2, Ordering::Relaxed); + self.background_syncing_status + .0 + .send(BackgroundSyncStatus::Stopping) + .ok(); } /// Stop the background syncing of the wallet pub async fn stop_background_syncing(&self) -> crate::wallet::Result<()> { log::debug!("[stop_background_syncing]"); - // immediately return if not running - if self.background_syncing_status.load(Ordering::Relaxed) == 0 { + + let mut rx_background_sync = self.background_syncing_status.1.clone(); + + // immediately return if is stopped + if *rx_background_sync.borrow() == BackgroundSyncStatus::Stopped { return Ok(()); } + // send stop request self.request_stop_background_syncing(); - // wait until it stopped - while self.background_syncing_status.load(Ordering::Relaxed) != 0 { - #[cfg(target_family = "wasm")] - gloo_timers::future::TimeoutFuture::new(10).await; - #[cfg(not(target_family = "wasm"))] - tokio::time::sleep(std::time::Duration::from_millis(10)).await; - } + + // wait until it has stopped + rx_background_sync + .wait_for(|status| *status == BackgroundSyncStatus::Stopped) + .await + .ok(); + Ok(()) } }