Skip to content

Commit

Permalink
feat(2.0): Optimized background sync (#1797)
Browse files Browse the repository at this point in the history
* feat(nodejs): Silently stop background syncing when Wallet is dropped

* feat(2.0): Optimized background sync

* fix

* switch to tokio notify

* chore: Remove comment

* final touches

* remove unnecessary named loop

* fmt

* enhancements

* ooops

* ooops again

* fmt
  • Loading branch information
marc2332 authored Jan 10, 2024
1 parent a2cfbe3 commit 4374ef0
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 57 deletions.
9 changes: 6 additions & 3 deletions sdk/src/wallet/core/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

#[cfg(feature = "storage")]
use std::collections::HashSet;
use std::sync::{atomic::AtomicUsize, Arc};
use std::sync::Arc;

use serde::Serialize;
use tokio::sync::{Mutex, RwLock};
Expand All @@ -19,7 +19,7 @@ use crate::{
client::secret::{GenerateAddressOptions, SecretManage, SecretManager},
types::block::address::{Address, Bech32Address},
wallet::{
core::{Bip44, WalletData, WalletInner},
core::{operations::background_syncing::BackgroundSyncStatus, Bip44, WalletData, WalletInner},
operations::syncing::SyncOptions,
ClientOptions, Wallet,
},
Expand Down Expand Up @@ -242,11 +242,14 @@ where
.finish()
.await?;

let background_syncing_status = tokio::sync::watch::channel(BackgroundSyncStatus::Stopped);
let background_syncing_status = (Arc::new(background_syncing_status.0), background_syncing_status.1);

// Build the wallet.
let wallet_inner = WalletInner {
default_sync_options: Mutex::new(SyncOptions::default()),
last_synced: Mutex::new(0),
background_syncing_status: AtomicUsize::new(0),
background_syncing_status,
client,
secret_manager: self.secret_manager.expect("make WalletInner::secret_manager optional?"),
#[cfg(feature = "events")]
Expand Down
9 changes: 6 additions & 3 deletions sdk/src/wallet/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ pub(crate) mod operations;

use std::{
collections::{HashMap, HashSet},
sync::{atomic::AtomicUsize, Arc},
sync::Arc,
};

use crypto::keys::{
Expand All @@ -17,6 +17,7 @@ use serde::{Deserialize, Serialize};
use tokio::sync::{Mutex, RwLock};

pub use self::builder::WalletBuilder;
use self::operations::background_syncing::BackgroundSyncStatus;
use super::types::{TransactionWithMetadata, TransactionWithMetadataDto};
#[cfg(feature = "events")]
use crate::wallet::events::{
Expand Down Expand Up @@ -84,8 +85,10 @@ pub struct WalletInner<S: SecretManage = SecretManager> {
// again, because sending transactions can change that
pub(crate) last_synced: Mutex<u128>,
pub(crate) default_sync_options: Mutex<SyncOptions>,
// 0 = not running, 1 = running, 2 = stopping
pub(crate) background_syncing_status: AtomicUsize,
pub(crate) background_syncing_status: (
Arc<tokio::sync::watch::Sender<BackgroundSyncStatus>>,
tokio::sync::watch::Receiver<BackgroundSyncStatus>,
),
pub(crate) client: Client,
// TODO: make this optional?
pub(crate) secret_manager: Arc<RwLock<S>>,
Expand Down
116 changes: 65 additions & 51 deletions sdk/src/wallet/core/operations/background_syncing.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
// Copyright 2021 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0

use std::{sync::atomic::Ordering, time::Duration};
use std::time::Duration;

use tokio::time::sleep;
use tokio::time::timeout;

use crate::{
client::secret::SecretManage,
wallet::{operations::syncing::SyncOptions, Wallet},
wallet::{operations::syncing::SyncOptions, task, Wallet},
};

/// The default interval for background syncing
pub(crate) const DEFAULT_BACKGROUNDSYNCING_INTERVAL: Duration = Duration::from_secs(7);

#[derive(Clone, PartialEq, Debug)]
pub(crate) enum BackgroundSyncStatus {
Stopped,
Running,
Stopping,
}

impl<S: 'static + SecretManage> Wallet<S>
where
crate::wallet::Error: From<S::Error>,
Expand All @@ -25,75 +32,82 @@ where
interval: Option<Duration>,
) -> crate::wallet::Result<()> {
log::debug!("[start_background_syncing]");

let (tx_background_sync, mut rx_background_sync) = self.background_syncing_status.clone();

// stop existing process if running
if self.background_syncing_status.load(Ordering::Relaxed) == 1 {
self.background_syncing_status.store(2, Ordering::Relaxed);
};
while self.background_syncing_status.load(Ordering::Relaxed) == 2 {
log::debug!("[background_syncing]: waiting for the old process to stop");
sleep(Duration::from_secs(1)).await;
if *rx_background_sync.borrow() == BackgroundSyncStatus::Running {
tx_background_sync.send(BackgroundSyncStatus::Stopping).ok();
}

self.background_syncing_status.store(1, Ordering::Relaxed);
log::debug!("[background_syncing]: waiting for the old process to stop");
rx_background_sync
.wait_for(|status| *status != BackgroundSyncStatus::Stopping)
.await
.ok();

tx_background_sync.send(BackgroundSyncStatus::Running).ok();

let wallet = self.clone();
let _background_syncing = std::thread::spawn(move || {
#[cfg(not(target_family = "wasm"))]
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
#[cfg(target_family = "wasm")]
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
runtime.block_on(async {
'outer: loop {
log::debug!("[background_syncing]: syncing wallet");

if let Err(err) = wallet.sync(options.clone()).await {
log::debug!("[background_syncing] error: {}", err)
}

// split interval syncing to seconds so stopping the process doesn't have to wait long
let seconds = interval.unwrap_or(DEFAULT_BACKGROUNDSYNCING_INTERVAL).as_secs();
for _ in 0..seconds {
if wallet.background_syncing_status.load(Ordering::Relaxed) == 2 {
log::debug!("[background_syncing]: stopping");
break 'outer;
}
sleep(Duration::from_secs(1)).await;
}
let interval_seconds = interval.unwrap_or(DEFAULT_BACKGROUNDSYNCING_INTERVAL);

task::spawn(async move {
loop {
log::debug!("[background_syncing]: syncing wallet");

if let Err(err) = wallet.sync(options.clone()).await {
log::debug!("[background_syncing] error: {}", err)
}
wallet.background_syncing_status.store(0, Ordering::Relaxed);
log::debug!("[background_syncing]: stopped");
});

let res = timeout(interval_seconds, async {
rx_background_sync
.wait_for(|status| *status == BackgroundSyncStatus::Stopping)
.await
.is_ok()
})
.await;

// If true it means rx_background_sync changed to BackgroundSyncStatus::Stopping
if Ok(true) == res {
log::debug!("[background_syncing]: stopping");
break;
}
}
tx_background_sync.send(BackgroundSyncStatus::Stopped).ok();
log::debug!("[background_syncing]: stopped");
});
Ok(())
}

/// Request to stop the background syncing of the wallet
pub fn request_stop_background_syncing(&self) {
log::debug!("[request_stop_background_syncing]");
self.background_syncing_status.store(2, Ordering::Relaxed);
self.background_syncing_status
.0
.send(BackgroundSyncStatus::Stopping)
.ok();
}

/// Stop the background syncing of the wallet
pub async fn stop_background_syncing(&self) -> crate::wallet::Result<()> {
log::debug!("[stop_background_syncing]");
// immediately return if not running
if self.background_syncing_status.load(Ordering::Relaxed) == 0 {

let mut rx_background_sync = self.background_syncing_status.1.clone();

// immediately return if is stopped
if *rx_background_sync.borrow() == BackgroundSyncStatus::Stopped {
return Ok(());
}

// send stop request
self.request_stop_background_syncing();
// wait until it stopped
while self.background_syncing_status.load(Ordering::Relaxed) != 0 {
#[cfg(target_family = "wasm")]
gloo_timers::future::TimeoutFuture::new(10).await;
#[cfg(not(target_family = "wasm"))]
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}

// wait until it has stopped
rx_background_sync
.wait_for(|status| *status == BackgroundSyncStatus::Stopped)
.await
.ok();

Ok(())
}
}

0 comments on commit 4374ef0

Please sign in to comment.