Skip to content

Commit

Permalink
fix(client): remove sync_multiple methods
Browse files Browse the repository at this point in the history
  • Loading branch information
RolandSherwin authored and bochaco committed Mar 28, 2024
1 parent ab25487 commit 6b4c374
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 172 deletions.
72 changes: 50 additions & 22 deletions sn_cli/src/acc_packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use color_eyre::{
use sn_client::{
protocol::storage::{Chunk, RegisterAddress, RetryStrategy},
registers::EntryHash,
Client, FilesApi, FolderEntry, FoldersApi, Metadata, UploadCfg, UploadSummary,
Client, FilesApi, FolderEntry, FoldersApi, Metadata, UploadCfg,
};
use std::{
collections::{
Expand All @@ -33,6 +33,7 @@ use std::{
io::Write,
path::{Path, PathBuf},
};
use tokio::task::JoinSet;
use tracing::trace;
use walkdir::{DirEntry, WalkDir};
use xor_name::XorName;
Expand Down Expand Up @@ -221,10 +222,8 @@ impl AccountPacket {
/// Sync local changes made to files and folder with their version on the network,
/// both pushing and pulling changes to/form the network.
pub async fn sync(&mut self, upload_cfg: UploadCfg, make_data_public: bool) -> Result<()> {
let ChangesToApply {
mut folders,
mutations,
} = self.scan_files_and_folders_for_changes(make_data_public)?;
let ChangesToApply { folders, mutations } =
self.scan_files_and_folders_for_changes(make_data_public)?;

if mutations.is_empty() {
println!("No local changes made to files/folders to be pushed to network.");
Expand All @@ -234,8 +233,8 @@ impl AccountPacket {
}

println!("Paying for folders hierarchy and uploading...");
let _synced_folders = self
.pay_and_sync(&mut folders, upload_cfg.clone(), make_data_public)
let synced_folders = self
.pay_and_sync_folders(folders, upload_cfg, make_data_public)
.await?;

// mark root folder as created if it wasn't already
Expand Down Expand Up @@ -266,7 +265,7 @@ impl AccountPacket {
}

// download files/folders which are new in the synced folders
let folders_to_download: Vec<_> = folders
let folders_to_download: Vec<_> = synced_folders
.iter()
.map(|(path, (folders_api, _))| {
let folder_name: OsString = path.file_name().unwrap_or_default().into();
Expand Down Expand Up @@ -633,28 +632,57 @@ impl AccountPacket {
.filter(|e| e.file_type().is_file())
}

// Pay and upload all the files and their metadata along with the folders (registers).
// This also merged the folder with the one from the network.
async fn pay_and_sync(
// Pay and upload all the files and folder.
async fn pay_and_sync_folders(
&self,
folders: &mut Folders,
folders: Folders,
upload_cfg: UploadCfg,
make_data_public: bool,
) -> Result<UploadSummary> {
) -> Result<Folders> {
let files_uploader = FilesUploader::new(self.client.clone(), self.wallet_dir.clone())
.set_upload_cfg(upload_cfg.clone())
.set_upload_cfg(upload_cfg)
.set_make_data_public(make_data_public)
.insert_entries(self.iter_only_files());
let _summary = files_uploader.start_upload().await?;

let files_summary = files_uploader.start_upload().await?;
// Sync the folders. The payment is made inside sync() if required.
let mut tasks = JoinSet::new();
for (path, (mut folder, folder_change)) in folders {
let op = if folder_change.is_new_folder() {
"Creation"
} else {
"Syncing"
};

// batch sync the folders
let folder_summary = FoldersApi::sync_multiple(
folders.iter_mut().map(|(_, (folder, _))| folder),
upload_cfg,
)
.await?;
Ok(files_summary.merge(folder_summary)?)
tasks.spawn(async move {
match folder.sync(upload_cfg).await {
Ok(()) => {
println!(
"{op} of Folder (for {path:?}) succeeded. Address: {}",
folder.address().to_hex()
);
}
Err(err) => {
println!("{op} of Folder (for {path:?}) failed: {err}")
}
}
(path, folder, folder_change)
});
}

let mut synced_folders = Folders::new();
while let Some(res) = tasks.join_next().await {
match res {
Ok((path, folder, c)) => {
synced_folders.insert(path, (folder, c));
}
Err(err) => {
println!("Failed to sync/create a Folder with/on the network: {err:?}");
}
}
}

Ok(synced_folders)
}

// Download a Folders and their files from the network and generate tracking info
Expand Down
2 changes: 1 addition & 1 deletion sn_cli/src/files/files_uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl FilesUploader {
let now = Instant::now();

let mut uploader = Uploader::new(self.client, self.root_dir);
uploader.set_upload_cfg(self.upload_cfg.clone());
uploader.set_upload_cfg(self.upload_cfg);
uploader.insert_chunk_paths(chunks_to_upload);

let events_handle = Self::spawn_upload_events_handler(
Expand Down
7 changes: 0 additions & 7 deletions sn_client/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,6 @@ pub enum Error {
#[error("Task completion notification channel is done")]
FailedToReadFromNotificationChannel,

// ------ Batch Sync ------
#[error("Inconsistent wallet directory during batch sync")]
InconsistentBatchSyncState,

#[error("Batch sync encountered an empty list")]
BatchSyncEmptyList,

#[error("Could not find register after batch sync: {0:?}")]
RegisterNotFoundAfterUpload(XorName),

Expand Down
86 changes: 17 additions & 69 deletions sn_client/src/folders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@
// permissions and limitations relating to use of the SAFE Network Software.

use super::{error::Result, Client, ClientRegister, WalletClient};
use crate::{uploader::UploadCfg, Error, FilesApi, UploadSummary, Uploader};
use crate::{Error, UploadCfg, Uploader};
use bls::{Ciphertext, PublicKey};
use bytes::{BufMut, BytesMut};
use self_encryption::MAX_CHUNK_SIZE;
use serde::{Deserialize, Serialize};
use sn_protocol::{
storage::{Chunk, ChunkAddress, RegisterAddress, RetryStrategy},
storage::{Chunk, ChunkAddress, RegisterAddress},
NetworkAddress,
};
use sn_registers::{Entry, EntryHash};
Expand Down Expand Up @@ -48,7 +48,6 @@ pub struct FoldersApi {
client: Client,
wallet_dir: PathBuf,
register: ClientRegister,
files_api: FilesApi,
// Cache of metadata chunks. We keep the Chunk itself till we upload it to the network.
metadata: BTreeMap<XorName, (Metadata, Option<Chunk>)>,
}
Expand Down Expand Up @@ -180,70 +179,22 @@ impl FoldersApi {
}

/// Sync local Folder with the network.
pub async fn sync(
&mut self,
verify_store: bool,
retry_strategy: Option<RetryStrategy>,
) -> Result<()> {
let mut wallet_client = self.wallet()?;

// First upload any newly created metadata chunk
for (_, meta_chunk) in self.metadata.values_mut() {
if let Some(chunk) = meta_chunk.take() {
self.files_api
.get_local_payment_and_upload_chunk(chunk.clone(), verify_store, retry_strategy)
.await?;
}
}

let payment_info = wallet_client.get_non_expired_payment_for_addr(&self.as_net_addr())?;

self.register
.sync(&mut wallet_client, verify_store, Some(payment_info))
.await?;

Ok(())
}

/// Sync multiple Folders with the network.
pub async fn sync_multiple(
folders: impl IntoIterator<Item = &mut Self>,
upload_cfg: UploadCfg,
) -> Result<UploadSummary> {
let folders_vec = folders.into_iter().collect::<Vec<_>>();
// Ensure there's only one unique root_dir across all folders
let unique_root_dirs: BTreeSet<&Path> = folders_vec
.iter()
.map(|folder| folder.wallet_dir.as_path())
.collect();
let root_dir = match unique_root_dirs.iter().next() {
Some(&dir) if unique_root_dirs.len() == 1 => dir,
_ => return Err(Error::InconsistentBatchSyncState),
};
let client = folders_vec
.first()
.ok_or(Error::BatchSyncEmptyList)?
.client
.clone();

let mut uploader = Uploader::new(client, root_dir.to_path_buf());
/// This makes a payment and uploads the folder if the metadata chunks and registers have not yet been paid.
pub async fn sync(&mut self, upload_cfg: UploadCfg) -> Result<()> {
let mut uploader = Uploader::new(self.client.clone(), self.wallet_dir.to_path_buf());
uploader.set_upload_cfg(upload_cfg);
uploader.set_collect_registers(true); // override upload cfg to collect all registers
uploader.insert_chunks(folders_vec.iter().flat_map(|folder| folder.meta_chunks()));
uploader.insert_register(folders_vec.iter().map(|folder| folder.register()));
let mut upload_summary = uploader.start_upload().await?;

// now update the registers
for folder in folders_vec {
let address = folder.address();
let updated_register = upload_summary
.uploaded_registers
.remove(address)
.ok_or(Error::RegisterNotFoundAfterUpload(address.xorname()))?;
folder.register = updated_register;
}

Ok(upload_summary)
uploader.set_collect_registers(true); // override upload cfg to collect the updated register.
uploader.insert_chunks(self.meta_chunks());
uploader.insert_register(vec![self.register()]);
let upload_summary = uploader.start_upload().await?;

let updated_register = upload_summary
.uploaded_registers
.get(self.address())
.ok_or(Error::RegisterNotFoundAfterUpload(self.address().xorname()))?
.clone();
self.register = updated_register;
Ok(())
}

/// Download a copy of the Folder from the network.
Expand Down Expand Up @@ -334,13 +285,10 @@ impl FoldersApi {

// Create a new FoldersApi instance with given register.
fn create(client: Client, wallet_dir: &Path, register: ClientRegister) -> Result<Self> {
let files_api = FilesApi::new(client.clone(), wallet_dir.to_path_buf());

Ok(Self {
client,
wallet_dir: wallet_dir.to_path_buf(),
register,
files_api,
metadata: BTreeMap::new(),
})
}
Expand Down
46 changes: 3 additions & 43 deletions sn_client/src/register.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,9 @@
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

use crate::{
wallet::StoragePaymentResult, Client, Error, Result, UploadCfg, UploadSummary, Uploader,
WalletClient,
};

use crate::{wallet::StoragePaymentResult, Client, Error, Result, WalletClient};
use bls::PublicKey;
use crdts::merkle_reg::MerkleReg;
use libp2p::{
kad::{Quorum, Record},
PeerId,
Expand All @@ -23,14 +20,9 @@ use sn_protocol::{
storage::{try_serialize_record, RecordKind, RetryStrategy},
NetworkAddress,
};

use crdts::merkle_reg::MerkleReg;
use sn_registers::{Entry, EntryHash, Permissions, Register, RegisterAddress, SignedRegister};
use sn_transfers::{NanoTokens, Payment};
use std::{
collections::{BTreeSet, HashSet, LinkedList},
path::PathBuf,
};
use std::collections::{BTreeSet, HashSet, LinkedList};
use xor_name::XorName;

/// Cached operations made to an offline Register instance are applied locally only,
Expand Down Expand Up @@ -555,38 +547,6 @@ impl ClientRegister {
Ok((storage_cost, royalties_fees))
}

/// Sync multiple Registers with the network.
pub async fn sync_multiple(
registers: impl IntoIterator<Item = &mut Self>,
root_dir: PathBuf,
upload_cfg: UploadCfg,
) -> Result<UploadSummary> {
let registers_vec = registers.into_iter().collect::<Vec<_>>();
let client = registers_vec
.first()
.ok_or(Error::BatchSyncEmptyList)?
.client
.clone();

let mut uploader = Uploader::new(client, root_dir);
uploader.set_upload_cfg(upload_cfg);
uploader.set_collect_registers(true); // override cfg to collect all the registers
uploader.insert_register(registers_vec.iter().map(|reg| (*reg).clone()));
let mut upload_summary = uploader.start_upload().await?;

// now update the registers
for current_register in registers_vec {
let address = current_register.address();
let updated_register = upload_summary
.uploaded_registers
.remove(address)
.ok_or(Error::RegisterNotFoundAfterUpload(address.xorname()))?;
*current_register = updated_register;
}

Ok(upload_summary)
}

/// Push all operations made locally to the replicas of this Register on the network.
/// This optionally verifies that the stored Register is the same as our local register.
///
Expand Down
2 changes: 1 addition & 1 deletion sn_client/src/uploader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use tokio::sync::mpsc;
use xor_name::XorName;

/// The set of options to pass into the `Uploader`
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Copy)]
pub struct UploadCfg {
pub batch_size: usize,
pub verify_store: bool,
Expand Down
Loading

0 comments on commit 6b4c374

Please sign in to comment.