Skip to content

Commit

Permalink
Merge pull request maidsafe#2224 from joshuef/RangeBasedGets
Browse files Browse the repository at this point in the history
Range based gets
  • Loading branch information
joshuef authored Oct 16, 2024
2 parents 567200e + 972610c commit 1d14196
Show file tree
Hide file tree
Showing 30 changed files with 1,559 additions and 1,042 deletions.
392 changes: 229 additions & 163 deletions .github/workflows/merge.yml

Large diffs are not rendered by default.

85 changes: 70 additions & 15 deletions sn_client/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@ use super::{
use bls::{PublicKey, SecretKey, Signature};
use libp2p::{
identity::Keypair,
kad::{Quorum, Record},
kad::{KBucketDistance, Quorum, Record},
Multiaddr, PeerId,
};
use rand::{thread_rng, Rng};
use sn_networking::{
get_signed_spend_from_record, multiaddr_is_global,
target_arch::{interval, spawn, timeout, Instant},
GetRecordCfg, NetworkBuilder, NetworkError, NetworkEvent, PutRecordCfg, VerificationKind,
GetRecordCfg, GetRecordError, NetworkBuilder, NetworkError, NetworkEvent, PutRecordCfg,
VerificationKind,
};
use sn_protocol::{
error::Error as ProtocolError,
Expand Down Expand Up @@ -305,6 +306,11 @@ impl Client {
self.events_broadcaster.subscribe()
}

/// Return the underlying network GetRange
pub async fn get_range(&self) -> Result<KBucketDistance> {
self.network.get_range().await.map_err(Error::from)
}

/// Sign the given data.
///
/// # Arguments
Expand Down Expand Up @@ -405,18 +411,60 @@ impl Client {
/// let xorname = XorName::random(&mut rng);
/// let address = RegisterAddress::new(xorname, owner);
/// // Get a signed register
/// let signed_register = client.get_signed_register_from_network(address);
/// let signed_register = client.get_signed_register_from_network(address, true);
/// # Ok(())
/// # }
/// ```
pub async fn get_signed_register_from_network(
&self,
address: RegisterAddress,
is_verifying: bool,
) -> Result<SignedRegister> {
let key = NetworkAddress::from_register_address(address).to_record_key();
let get_quorum = if is_verifying {
Quorum::All
} else {
Quorum::Majority
};
let retry_strategy = if is_verifying {
Some(RetryStrategy::Balanced)
} else {
Some(RetryStrategy::Quick)
};
let get_cfg = GetRecordCfg {
get_quorum,
retry_strategy,
target_record: None,
expected_holders: Default::default(),
is_register: true,
};

let maybe_record = self.network.get_record_from_network(key, &get_cfg).await;
let record = match &maybe_record {
Ok(r) => r,
Err(NetworkError::GetRecordError(GetRecordError::SplitRecord { result_map })) => {
let mut results_to_merge = HashMap::default();

for (address, (r, _peers)) in result_map {
results_to_merge.insert(*address, r.clone());
}

return merge_register_records(address, &results_to_merge);
}
Err(e) => {
warn!("Failed to get record at {address:?} from the network: {e:?}");
return Err(ProtocolError::RegisterNotFound(Box::new(address)).into());
}
};

debug!(
"Got record from the network, {:?}",
PrettyPrintRecordKey::from(&record.key)
);

let maybe_records = self.network.get_register_record_from_network(key).await?;
merge_register_records(address, &maybe_records)
let register = get_register_from_record(record)
.map_err(|_| ProtocolError::RegisterNotFound(Box::new(address)))?;
Ok(register)
}

/// Retrieve a Register from the network.
Expand Down Expand Up @@ -742,7 +790,7 @@ impl Client {
/// ```
pub async fn verify_register_stored(&self, address: RegisterAddress) -> Result<SignedRegister> {
info!("Verifying register: {address:?}");
self.get_signed_register_from_network(address).await
self.get_signed_register_from_network(address, true).await
}

/// Quickly checks if a `Register` is stored by expected nodes on the network.
Expand Down Expand Up @@ -776,7 +824,7 @@ impl Client {
address: RegisterAddress,
) -> Result<SignedRegister> {
info!("Quickly checking for existing register : {address:?}");
self.get_signed_register_from_network(address).await
self.get_signed_register_from_network(address, false).await
}

/// Send a `SpendCashNote` request to the network. Protected method.
Expand Down Expand Up @@ -816,25 +864,34 @@ impl Client {
.iter()
.cloned()
.collect();
info!("Expecting holders: {expected_holders:?}");
(Some(record.clone()), expected_holders)
} else {
(None, Default::default())
};

// When there is retry on Put side, no need to have a retry on Get
let verification_cfg = GetRecordCfg {
get_quorum: Quorum::Majority,
get_quorum: Quorum::All,
retry_strategy: None,
target_record: record_to_verify,
expected_holders,
is_register: false,
};

let verification = if verify_store {
Some((VerificationKind::Network, verification_cfg))
} else {
None
};

let put_cfg = PutRecordCfg {
put_quorum: Quorum::Majority,
put_quorum: Quorum::All,
retry_strategy: Some(RetryStrategy::Persistent),
use_put_record_to: None,
verification: Some((VerificationKind::Network, verification_cfg)),
verification,
};

Ok(self.network.put_record(record, &put_cfg).await?)
}

Expand Down Expand Up @@ -871,7 +928,7 @@ impl Client {
self.try_fetch_spend_from_network(
address,
GetRecordCfg {
get_quorum: Quorum::Majority,
get_quorum: Quorum::All,
retry_strategy: Some(RetryStrategy::Balanced),
target_record: None,
expected_holders: Default::default(),
Expand Down Expand Up @@ -904,7 +961,7 @@ impl Client {
self.try_fetch_spend_from_network(
address,
GetRecordCfg {
get_quorum: Quorum::Majority,
get_quorum: Quorum::All,
retry_strategy: None,
target_record: None,
expected_holders: Default::default(),
Expand Down Expand Up @@ -961,9 +1018,7 @@ impl Client {
}
Err(err) => {
warn!("Invalid signed spend got from network for {address:?}: {err:?}.");
Err(Error::CouldNotVerifyTransfer(format!(
"Verification failed for spent at {address:?} with error {err:?}"
)))
Err(Error::from(err))
}
}
}
Expand Down
9 changes: 6 additions & 3 deletions sn_client/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::UploadSummary;
use super::ClientEvent;
use sn_protocol::NetworkAddress;
use sn_registers::{Entry, EntryHash};
use sn_transfers::SpendAddress;
use std::collections::BTreeSet;
use thiserror::Error;
use tokio::time::Duration;
Expand All @@ -23,6 +24,8 @@ use xor_name::XorName;
pub enum Error {
#[error("Genesis disbursement failed")]
GenesisDisbursement,
#[error("Faucet disbursement has already occured")]
FaucetDisbursement,

#[error("Genesis error {0}")]
GenesisError(#[from] sn_transfers::GenesisError),
Expand All @@ -45,6 +48,9 @@ pub enum Error {
#[error("Chunks error {0}.")]
Chunks(#[from] super::chunks::Error),

#[error("No cashnote found at {0:?}.")]
NoCashNoteFound(SpendAddress),

#[error("Decrypting a Folder's item failed: {0}")]
FolderEntryDecryption(EntryHash),

Expand All @@ -63,9 +69,6 @@ pub enum Error {
#[error(transparent)]
JoinError(#[from] tokio::task::JoinError),

/// A general error when verifying a transfer validity in the network.
#[error("Failed to verify transfer validity in the network {0}")]
CouldNotVerifyTransfer(String),
#[error("Invalid DAG")]
InvalidDag,
#[error("Serialization error: {0:?}")]
Expand Down
16 changes: 15 additions & 1 deletion sn_client/src/faucet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,21 @@ pub async fn fund_faucet_from_genesis_wallet(
if client.is_genesis_spend_present().await {
warn!("Faucet can't get funded from genesis, genesis is already spent!");
println!("Faucet can't get funded from genesis, genesis is already spent!");
panic!("Faucet can't get funded from genesis, genesis is already spent!");
// Try loading cash notes up to 100 times, waiting 1 second between attempts
for attempt in 1..=100 {
println!("Attempt {attempt} to load cash notes");
debug!("Attempt {attempt} to load cash notes");
faucet_wallet.try_load_cash_notes()?;
if !faucet_wallet.balance().is_zero() {
println!("Successfully loaded cash notes on attempt {attempt}");
debug!("Successfully loaded cash notes on attempt {attempt}");
return Ok(());
}
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}

// If we've tried 100 times and still have zero balance, return an error
return Err(Error::FaucetDisbursement);
}

println!("Initiating genesis...");
Expand Down
2 changes: 1 addition & 1 deletion sn_client/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use tracing::{info, warn};
pub const AMOUNT_TO_FUND_WALLETS: u64 = 100 * 1_000_000_000;

// The number of times to try to load the faucet wallet
const LOAD_FAUCET_WALLET_RETRIES: usize = 6;
const LOAD_FAUCET_WALLET_RETRIES: usize = 10;

// mutex to restrict access to faucet wallet from concurrent tests
static FAUCET_WALLET_MUTEX: Mutex<()> = Mutex::const_new(());
Expand Down
38 changes: 28 additions & 10 deletions sn_client/src/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1009,9 +1009,7 @@ impl Client {
}

if cash_notes.is_empty() {
return Err(WalletError::CouldNotVerifyTransfer(
"All the redeemed CashNotes are already spent".to_string(),
));
return Err(WalletError::AllRedeemedCashnotesSpent);
}

Ok(cash_notes)
Expand Down Expand Up @@ -1049,23 +1047,40 @@ impl Client {
/// # }
/// ```
pub async fn verify_cashnote(&self, cash_note: &CashNote) -> WalletResult<()> {
let address = SpendAddress::from_unique_pubkey(&cash_note.unique_pubkey());

// We need to get all the spends in the cash_note from the network,
// and compare them to the spends in the cash_note, to know if the
// transfer is considered valid in the network.
let mut tasks = Vec::new();

info!(
"parent spends for cn; {address:?}: {:?}",
&cash_note.parent_spends.len()
);

for spend in &cash_note.parent_spends {
let address = SpendAddress::from_unique_pubkey(spend.unique_pubkey());
debug!(
"Getting spend for pubkey {:?} from network at {address:?}",
info!(
"Getting parent spend for cn {address:?} pubkey {:?} from network at {address:?}",
spend.unique_pubkey()
);
tasks.push(self.get_spend_from_network(address));
}

let mut received_spends = std::collections::BTreeSet::new();
for result in join_all(tasks).await {
let network_valid_spend =
result.map_err(|err| WalletError::CouldNotVerifyTransfer(err.to_string()))?;
let network_valid_spend = match result {
Ok(spend) => Ok(spend),
Err(error) => match error {
Error::Network(sn_networking::NetworkError::DoubleSpendAttempt(spends)) => {
warn!("BurntSpend found with {spends:?}");
Err(WalletError::BurntSpend)
}
err => Err(WalletError::CouldNotVerifyTransfer(format!("{err:?}"))),
},
}?;

let _ = received_spends.insert(network_valid_spend);
}

Expand All @@ -1074,9 +1089,12 @@ impl Client {
if received_spends == cash_note.parent_spends {
return Ok(());
}
Err(WalletError::CouldNotVerifyTransfer(
"The spends in network were not the same as the ones in the CashNote. The parents of this CashNote are probably double spends.".into(),
))

warn!(
"Unexpected parent spends found in CashNote verification at {:?}: {received_spends:?}.",
address
);
Err(WalletError::UnexpectedParentSpends(address))
}
}

Expand Down
2 changes: 1 addition & 1 deletion sn_networking/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ rayon = "1.8.0"
rmp-serde = "1.1.1"
self_encryption = "~0.30.0"
serde = { version = "1.0.133", features = ["derive", "rc"] }
sn_build_info = { path="../sn_build_info", version = "0.1.15" }
sn_build_info = { path = "../sn_build_info", version = "0.1.15" }
sn_protocol = { path = "../sn_protocol", version = "0.17.11" }
sn_transfers = { path = "../sn_transfers", version = "0.19.3" }
sn_registers = { path = "../sn_registers", version = "0.3.21" }
Expand Down
Loading

0 comments on commit 1d14196

Please sign in to comment.