Skip to content

Commit

Permalink
fix(folders): race condition caused by uploader upon concurrent Folde…
Browse files Browse the repository at this point in the history
…rs being payed and synced
  • Loading branch information
bochaco committed Apr 2, 2024
1 parent 430dfcd commit da05ca0
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 19 deletions.
34 changes: 31 additions & 3 deletions sn_cli/src/acc_packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use super::{
use sn_client::{
protocol::storage::{Chunk, RegisterAddress, RetryStrategy},
registers::EntryHash,
transfers::{DerivationIndex, MainSecretKey},
Client, FilesApi, FolderEntry, FoldersApi, Metadata, UploadCfg,
transfers::{DerivationIndex, HotWallet, MainSecretKey},
Client, FilesApi, FolderEntry, FoldersApi, Metadata, UploadCfg, WalletClient,
};

use bls::PublicKey;
Expand Down Expand Up @@ -682,7 +682,35 @@ impl AccountPacket {
.insert_entries(self.iter_only_files());
let _summary = files_uploader.start_upload().await?;

// Sync the folders. The payment is made inside sync() if required.
// Let's make the storage payment for Folders
let mut wallet_client =
WalletClient::new(self.client.clone(), HotWallet::load_from(&self.wallet_dir)?);
let mut net_addresses = vec![];
let mut new_folders = 0;
// let's collect list of addresses we need to pay for
folders.iter().for_each(|(_, (folder, folder_change))| {
if folder_change.is_new_folder() {
net_addresses.push(folder.as_net_addr());
new_folders += 1;
}
net_addresses.extend(folder.meta_addrs_to_pay());
});

let payment_result = wallet_client
.pay_for_storage(net_addresses.into_iter())
.await?;
match payment_result
.storage_cost
.checked_add(payment_result.royalty_fees)
{
Some(cost) => {
let balance = wallet_client.balance();
println!("Made payment of {cost} for {new_folders} Folders. New balance: {balance}",)
}
None => bail!("Failed to calculate total payment cost"),
}

// Sync Folders concurrently now that payments have been made.
let mut tasks = JoinSet::new();
for (path, (mut folder, folder_change)) in folders {
let op = if folder_change.is_new_folder() {
Expand Down
45 changes: 30 additions & 15 deletions sn_client/src/folders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
// permissions and limitations relating to use of the SAFE Network Software.

use super::{error::Result, Client, ClientRegister, WalletClient};
use crate::{Error, UploadCfg, Uploader};
use crate::{Error, FilesApi, UploadCfg};
use bls::{Ciphertext, PublicKey};
use bytes::{BufMut, BytesMut};
use self_encryption::MAX_CHUNK_SIZE;
Expand Down Expand Up @@ -48,6 +48,7 @@ pub struct FoldersApi {
client: Client,
wallet_dir: PathBuf,
register: ClientRegister,
files_api: FilesApi,
// Cache of metadata chunks. We keep the Chunk itself till we upload it to the network.
metadata: BTreeMap<XorName, (Metadata, Option<Chunk>)>,
}
Expand Down Expand Up @@ -179,21 +180,32 @@ impl FoldersApi {
}

/// Sync local Folder with the network.
/// This makes a payment and uploads the folder if the metadata chunks and registers have not yet been paid.
pub async fn sync(&mut self, upload_cfg: UploadCfg) -> Result<()> {
let mut uploader = Uploader::new(self.client.clone(), self.wallet_dir.to_path_buf());
uploader.set_upload_cfg(upload_cfg);
uploader.set_collect_registers(true); // override upload cfg to collect the updated register.
uploader.insert_chunks(self.meta_chunks());
uploader.insert_register(vec![self.register()]);
let upload_summary = uploader.start_upload().await?;

let updated_register = upload_summary
.uploaded_registers
.get(self.address())
.ok_or(Error::RegisterNotFoundAfterUpload(self.address().xorname()))?
.clone();
self.register = updated_register;
let mut wallet_client = self.wallet()?;

// First upload any newly created metadata chunk
for (_, meta_chunk) in self.metadata.values_mut() {
if let Some(chunk) = meta_chunk.take() {
self.files_api
.get_local_payment_and_upload_chunk(
chunk.clone(),
upload_cfg.verify_store,
Some(upload_cfg.retry_strategy),
)
.await?;
}
}

let payment_info = wallet_client.get_recent_payment_for_addr(&self.as_net_addr())?;

self.register
.sync(
&mut wallet_client,
upload_cfg.verify_store,
Some(payment_info),
)
.await?;

Ok(())
}

Expand Down Expand Up @@ -285,10 +297,13 @@ impl FoldersApi {

// Create a new FoldersApi instance with given register.
fn create(client: Client, wallet_dir: &Path, register: ClientRegister) -> Result<Self> {
let files_api = FilesApi::new(client.clone(), wallet_dir.to_path_buf());

Ok(Self {
client,
wallet_dir: wallet_dir.to_path_buf(),
register,
files_api,
metadata: BTreeMap::new(),
})
}
Expand Down
18 changes: 17 additions & 1 deletion sn_client/tests/folders_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@

use bls::SecretKey;
use eyre::Result;
use sn_client::test_utils::{get_funded_wallet, get_new_client, random_file_chunk};
use sn_client::test_utils::{
get_funded_wallet, get_new_client, pay_for_storage, random_file_chunk,
};
use sn_client::{FolderEntry, FoldersApi, Metadata};
use sn_protocol::{storage::ChunkAddress, NetworkAddress};
use sn_registers::{EntryHash, RegisterAddress};
Expand Down Expand Up @@ -208,6 +210,12 @@ async fn test_folder_retrieve() -> Result<()> {
let (file2_entry_hash, file2_meta_xorname, file2_metadata) =
subfolder.add_file("file2.txt".into(), file2_chunk.clone(), None)?;

// let's pay for storage
let mut addrs2pay = vec![folder.as_net_addr(), subfolder.as_net_addr()];
addrs2pay.extend(folder.meta_addrs_to_pay());
addrs2pay.extend(subfolder.meta_addrs_to_pay());
pay_for_storage(&client, wallet_dir, addrs2pay).await?;

folder.sync(Default::default()).await?;
subfolder.sync(Default::default()).await?;

Expand Down Expand Up @@ -294,6 +302,14 @@ async fn test_folder_merge_changes() -> Result<()> {
let (file_b2_entry_hash, file_b2_meta_xorname, file_b2_metadata) =
subfolder_b.add_file("fileB2.txt".into(), file_b2_chunk.clone(), None)?;

// let's pay for storage
let mut addrs2pay = vec![folder_a.as_net_addr(), subfolder_a.as_net_addr()];
addrs2pay.extend(folder_a.meta_addrs_to_pay());
addrs2pay.extend(subfolder_a.meta_addrs_to_pay());
addrs2pay.extend(folder_b.meta_addrs_to_pay());
addrs2pay.extend(subfolder_b.meta_addrs_to_pay());
pay_for_storage(&client, wallet_dir, addrs2pay).await?;

folder_a.sync(Default::default()).await?;
subfolder_a.sync(Default::default()).await?;
folder_b.sync(Default::default()).await?;
Expand Down

0 comments on commit da05ca0

Please sign in to comment.