From da05ca03e04700aec3338164606e5094d0461053 Mon Sep 17 00:00:00 2001 From: bochaco Date: Mon, 1 Apr 2024 17:06:22 -0300 Subject: [PATCH] fix(folders): race condition caused by uploader upon concurrent Folders being payed and synced --- sn_cli/src/acc_packet.rs | 34 ++++++++++++++++++++++--- sn_client/src/folders.rs | 45 ++++++++++++++++++++++------------ sn_client/tests/folders_api.rs | 18 +++++++++++++- 3 files changed, 78 insertions(+), 19 deletions(-) diff --git a/sn_cli/src/acc_packet.rs b/sn_cli/src/acc_packet.rs index 11f7d4e5b1..ec20c5ac72 100644 --- a/sn_cli/src/acc_packet.rs +++ b/sn_cli/src/acc_packet.rs @@ -18,8 +18,8 @@ use super::{ use sn_client::{ protocol::storage::{Chunk, RegisterAddress, RetryStrategy}, registers::EntryHash, - transfers::{DerivationIndex, MainSecretKey}, - Client, FilesApi, FolderEntry, FoldersApi, Metadata, UploadCfg, + transfers::{DerivationIndex, HotWallet, MainSecretKey}, + Client, FilesApi, FolderEntry, FoldersApi, Metadata, UploadCfg, WalletClient, }; use bls::PublicKey; @@ -682,7 +682,35 @@ impl AccountPacket { .insert_entries(self.iter_only_files()); let _summary = files_uploader.start_upload().await?; - // Sync the folders. The payment is made inside sync() if required. + // Let's make the storage payment for Folders + let mut wallet_client = + WalletClient::new(self.client.clone(), HotWallet::load_from(&self.wallet_dir)?); + let mut net_addresses = vec![]; + let mut new_folders = 0; + // let's collect list of addresses we need to pay for + folders.iter().for_each(|(_, (folder, folder_change))| { + if folder_change.is_new_folder() { + net_addresses.push(folder.as_net_addr()); + new_folders += 1; + } + net_addresses.extend(folder.meta_addrs_to_pay()); + }); + + let payment_result = wallet_client + .pay_for_storage(net_addresses.into_iter()) + .await?; + match payment_result + .storage_cost + .checked_add(payment_result.royalty_fees) + { + Some(cost) => { + let balance = wallet_client.balance(); + println!("Made payment of {cost} for {new_folders} Folders. New balance: {balance}",) + } + None => bail!("Failed to calculate total payment cost"), + } + + // Sync Folders concurrently now that payments have been made. let mut tasks = JoinSet::new(); for (path, (mut folder, folder_change)) in folders { let op = if folder_change.is_new_folder() { diff --git a/sn_client/src/folders.rs b/sn_client/src/folders.rs index a2fe92b11c..624a9e7f4e 100644 --- a/sn_client/src/folders.rs +++ b/sn_client/src/folders.rs @@ -7,7 +7,7 @@ // permissions and limitations relating to use of the SAFE Network Software. use super::{error::Result, Client, ClientRegister, WalletClient}; -use crate::{Error, UploadCfg, Uploader}; +use crate::{Error, FilesApi, UploadCfg}; use bls::{Ciphertext, PublicKey}; use bytes::{BufMut, BytesMut}; use self_encryption::MAX_CHUNK_SIZE; @@ -48,6 +48,7 @@ pub struct FoldersApi { client: Client, wallet_dir: PathBuf, register: ClientRegister, + files_api: FilesApi, // Cache of metadata chunks. We keep the Chunk itself till we upload it to the network. metadata: BTreeMap)>, } @@ -179,21 +180,32 @@ impl FoldersApi { } /// Sync local Folder with the network. - /// This makes a payment and uploads the folder if the metadata chunks and registers have not yet been paid. pub async fn sync(&mut self, upload_cfg: UploadCfg) -> Result<()> { - let mut uploader = Uploader::new(self.client.clone(), self.wallet_dir.to_path_buf()); - uploader.set_upload_cfg(upload_cfg); - uploader.set_collect_registers(true); // override upload cfg to collect the updated register. - uploader.insert_chunks(self.meta_chunks()); - uploader.insert_register(vec![self.register()]); - let upload_summary = uploader.start_upload().await?; - - let updated_register = upload_summary - .uploaded_registers - .get(self.address()) - .ok_or(Error::RegisterNotFoundAfterUpload(self.address().xorname()))? - .clone(); - self.register = updated_register; + let mut wallet_client = self.wallet()?; + + // First upload any newly created metadata chunk + for (_, meta_chunk) in self.metadata.values_mut() { + if let Some(chunk) = meta_chunk.take() { + self.files_api + .get_local_payment_and_upload_chunk( + chunk.clone(), + upload_cfg.verify_store, + Some(upload_cfg.retry_strategy), + ) + .await?; + } + } + + let payment_info = wallet_client.get_recent_payment_for_addr(&self.as_net_addr())?; + + self.register + .sync( + &mut wallet_client, + upload_cfg.verify_store, + Some(payment_info), + ) + .await?; + Ok(()) } @@ -285,10 +297,13 @@ impl FoldersApi { // Create a new FoldersApi instance with given register. fn create(client: Client, wallet_dir: &Path, register: ClientRegister) -> Result { + let files_api = FilesApi::new(client.clone(), wallet_dir.to_path_buf()); + Ok(Self { client, wallet_dir: wallet_dir.to_path_buf(), register, + files_api, metadata: BTreeMap::new(), }) } diff --git a/sn_client/tests/folders_api.rs b/sn_client/tests/folders_api.rs index 609c4abb5c..4b7c74fc9f 100644 --- a/sn_client/tests/folders_api.rs +++ b/sn_client/tests/folders_api.rs @@ -10,7 +10,9 @@ use bls::SecretKey; use eyre::Result; -use sn_client::test_utils::{get_funded_wallet, get_new_client, random_file_chunk}; +use sn_client::test_utils::{ + get_funded_wallet, get_new_client, pay_for_storage, random_file_chunk, +}; use sn_client::{FolderEntry, FoldersApi, Metadata}; use sn_protocol::{storage::ChunkAddress, NetworkAddress}; use sn_registers::{EntryHash, RegisterAddress}; @@ -208,6 +210,12 @@ async fn test_folder_retrieve() -> Result<()> { let (file2_entry_hash, file2_meta_xorname, file2_metadata) = subfolder.add_file("file2.txt".into(), file2_chunk.clone(), None)?; + // let's pay for storage + let mut addrs2pay = vec![folder.as_net_addr(), subfolder.as_net_addr()]; + addrs2pay.extend(folder.meta_addrs_to_pay()); + addrs2pay.extend(subfolder.meta_addrs_to_pay()); + pay_for_storage(&client, wallet_dir, addrs2pay).await?; + folder.sync(Default::default()).await?; subfolder.sync(Default::default()).await?; @@ -294,6 +302,14 @@ async fn test_folder_merge_changes() -> Result<()> { let (file_b2_entry_hash, file_b2_meta_xorname, file_b2_metadata) = subfolder_b.add_file("fileB2.txt".into(), file_b2_chunk.clone(), None)?; + // let's pay for storage + let mut addrs2pay = vec![folder_a.as_net_addr(), subfolder_a.as_net_addr()]; + addrs2pay.extend(folder_a.meta_addrs_to_pay()); + addrs2pay.extend(subfolder_a.meta_addrs_to_pay()); + addrs2pay.extend(folder_b.meta_addrs_to_pay()); + addrs2pay.extend(subfolder_b.meta_addrs_to_pay()); + pay_for_storage(&client, wallet_dir, addrs2pay).await?; + folder_a.sync(Default::default()).await?; subfolder_a.sync(Default::default()).await?; folder_b.sync(Default::default()).await?;