diff --git a/.github/workflows/benchmark-prs.yml b/.github/workflows/benchmark-prs.yml index 25392240a3..eb27cf7ffc 100644 --- a/.github/workflows/benchmark-prs.yml +++ b/.github/workflows/benchmark-prs.yml @@ -85,11 +85,13 @@ jobs: ########################### ### Client Mem Analysis ### ########################### + ### The peak limit shall be restored back to 50MB, + ### Once client side chunking/quoting flow got re-examined. - name: Check client memory usage shell: bash run: | - client_peak_mem_limit_mb="1024" # mb + client_peak_mem_limit_mb="1500" # mb client_avg_mem_limit_mb="512" # mb peak_mem_usage=$( diff --git a/ant-networking/src/lib.rs b/ant-networking/src/lib.rs index a02767594c..c43cdcdf8e 100644 --- a/ant-networking/src/lib.rs +++ b/ant-networking/src/lib.rs @@ -1063,50 +1063,6 @@ impl Network { send_local_swarm_cmd(self.local_swarm_cmd_sender().clone(), cmd); } - /// Returns the closest peers to the given `XorName`, sorted by their distance to the xor_name. - /// If `client` is false, then include `self` among the `closest_peers` - pub async fn get_close_group_closest_peers( - &self, - key: &NetworkAddress, - client: bool, - ) -> Result> { - debug!("Getting the closest peers to {key:?}"); - let (sender, receiver) = oneshot::channel(); - self.send_network_swarm_cmd(NetworkSwarmCmd::GetClosestPeersToAddressFromNetwork { - key: key.clone(), - sender, - }); - let k_bucket_peers = receiver.await?; - - // Count self in if among the CLOSE_GROUP_SIZE closest and sort the result - let result_len = k_bucket_peers.len(); - let mut closest_peers = k_bucket_peers; - // ensure we're not including self here - if client { - // remove our peer id from the calculations here: - closest_peers.retain(|&x| x != self.peer_id()); - if result_len != closest_peers.len() { - info!("Remove self client from the closest_peers"); - } - } - if tracing::level_enabled!(tracing::Level::DEBUG) { - let close_peers_pretty_print: Vec<_> = closest_peers - .iter() - .map(|peer_id| { - format!( - "{peer_id:?}({:?})", - PrettyPrintKBucketKey(NetworkAddress::from_peer(*peer_id).as_kbucket_key()) - ) - }) - .collect(); - - debug!("Network knowledge of close peers to {key:?} are: {close_peers_pretty_print:?}"); - } - - let closest_peers = sort_peers_by_address(&closest_peers, key, CLOSE_GROUP_SIZE)?; - Ok(closest_peers.into_iter().cloned().collect()) - } - /// Returns the closest peers to the given `XorName`, sorted by their distance to the xor_name. /// If `client` is false, then include `self` among the `closest_peers` /// @@ -1155,7 +1111,7 @@ impl Network { ); } - let closest_peers = sort_peers_by_address(&closest_peers, key, CLOSE_GROUP_SIZE)?; + let closest_peers = sort_peers_by_address(&closest_peers, key, CLOSE_GROUP_SIZE + 1)?; Ok(closest_peers.into_iter().cloned().collect()) } diff --git a/ant-node/src/log_markers.rs b/ant-node/src/log_markers.rs index d5ef326b63..23f7c0829e 100644 --- a/ant-node/src/log_markers.rs +++ b/ant-node/src/log_markers.rs @@ -51,7 +51,7 @@ pub enum Marker<'a> { /// Valid paid to us and royalty paid register stored ValidPaidRegisterPutFromClient(&'a PrettyPrintRecordKey<'a>), /// Valid transaction stored - ValidSpendPutFromClient(&'a PrettyPrintRecordKey<'a>), + ValidTransactionPutFromClient(&'a PrettyPrintRecordKey<'a>), /// Valid scratchpad stored ValidScratchpadRecordPutFromClient(&'a PrettyPrintRecordKey<'a>), diff --git a/ant-node/src/put_validation.rs b/ant-node/src/put_validation.rs index 9beec8b740..67a01b275b 100644 --- a/ant-node/src/put_validation.rs +++ b/ant-node/src/put_validation.rs @@ -6,6 +6,8 @@ // KIND, either express or implied. Please review the Licences for the specific language governing // permissions and limitations relating to use of the SAFE Network Software. +use std::collections::BTreeSet; + use crate::{node::Node, Error, Marker, Result}; use ant_evm::payment_vault::verify_data_payment; use ant_evm::{AttoTokens, ProofOfPayment}; @@ -162,29 +164,11 @@ impl Node { .await } RecordKind::Transaction => { - let record_key = record.key.clone(); - let value_to_hash = record.value.clone(); - let transactions = try_deserialize_record::>(&record)?; - let result = self - .validate_merge_and_store_transactions(transactions, &record_key) - .await; - if result.is_ok() { - Marker::ValidSpendPutFromClient(&PrettyPrintRecordKey::from(&record_key)).log(); - let content_hash = XorName::from_content(&value_to_hash); - self.replicate_valid_fresh_record( - record_key, - RecordType::NonChunk(content_hash), - ); - - // Notify replication_fetcher to mark the attempt as completed. - // Send the notification earlier to avoid it got skipped due to: - // the record becomes stored during the fetch because of other interleaved process. - self.network().notify_fetch_completed( - record.key.clone(), - RecordType::NonChunk(content_hash), - ); - } - result + // Transactions should always be paid for + error!("Transaction should not be validated at this point"); + Err(Error::InvalidPutWithoutPayment( + PrettyPrintRecordKey::from(&record.key).into_owned(), + )) } RecordKind::TransactionWithPayment => { let (payment, transaction) = @@ -224,6 +208,12 @@ impl Node { .await; if res.is_ok() { let content_hash = XorName::from_content(&record.value); + Marker::ValidTransactionPutFromClient(&PrettyPrintRecordKey::from(&record.key)) + .log(); + self.replicate_valid_fresh_record( + record.key.clone(), + RecordType::NonChunk(content_hash), + ); // Notify replication_fetcher to mark the attempt as completed. // Send the notification earlier to avoid it got skipped due to: @@ -601,23 +591,24 @@ impl Node { } // verify the transactions - let mut validated_transactions: Vec = transactions_for_key + let mut validated_transactions: BTreeSet = transactions_for_key .into_iter() .filter(|t| t.verify()) .collect(); // skip if none are valid - let addr = match validated_transactions.as_slice() { - [] => { + let addr = match validated_transactions.first() { + None => { warn!("Found no validated transactions to store at {pretty_key:?}"); return Ok(()); } - [t, ..] => t.address(), + Some(t) => t.address(), }; - // add local transactions to the validated transactions + // add local transactions to the validated transactions, turn to Vec let local_txs = self.get_local_transactions(addr).await?; - validated_transactions.extend(local_txs); + validated_transactions.extend(local_txs.into_iter()); + let validated_transactions: Vec = validated_transactions.into_iter().collect(); // store the record into the local storage let record = Record { diff --git a/ant-protocol/src/storage/transaction.rs b/ant-protocol/src/storage/transaction.rs index 0045f9e746..6f7a7a9b11 100644 --- a/ant-protocol/src/storage/transaction.rs +++ b/ant-protocol/src/storage/transaction.rs @@ -7,6 +7,7 @@ // permissions and limitations relating to use of the SAFE Network Software. use super::address::TransactionAddress; +use bls::SecretKey; use serde::{Deserialize, Serialize}; // re-exports @@ -27,13 +28,15 @@ pub struct Transaction { } impl Transaction { + /// Create a new transaction, signing it with the provided secret key. pub fn new( owner: PublicKey, parents: Vec, content: TransactionContent, outputs: Vec<(PublicKey, TransactionContent)>, - signature: Signature, + signing_key: &SecretKey, ) -> Self { + let signature = signing_key.sign(Self::bytes_to_sign(&owner, &parents, &content, &outputs)); Self { owner, parents, @@ -43,28 +46,45 @@ impl Transaction { } } - pub fn address(&self) -> TransactionAddress { - TransactionAddress::from_owner(self.owner) + /// Create a new transaction, with the signature already calculated. + pub fn new_with_signature( + owner: PublicKey, + parents: Vec, + content: TransactionContent, + outputs: Vec<(PublicKey, TransactionContent)>, + signature: Signature, + ) -> Self { + Self { + owner, + parents, + content, + outputs, + signature, + } } - pub fn bytes_for_signature(&self) -> Vec { + /// Get the bytes that the signature is calculated from. + pub fn bytes_to_sign( + owner: &PublicKey, + parents: &[PublicKey], + content: &[u8], + outputs: &[(PublicKey, TransactionContent)], + ) -> Vec { let mut bytes = Vec::new(); - bytes.extend_from_slice(&self.owner.to_bytes()); + bytes.extend_from_slice(&owner.to_bytes()); bytes.extend_from_slice("parent".as_bytes()); bytes.extend_from_slice( - &self - .parents + &parents .iter() .map(|p| p.to_bytes()) .collect::>() .concat(), ); bytes.extend_from_slice("content".as_bytes()); - bytes.extend_from_slice(&self.content); + bytes.extend_from_slice(content); bytes.extend_from_slice("outputs".as_bytes()); bytes.extend_from_slice( - &self - .outputs + &outputs .iter() .flat_map(|(p, c)| [&p.to_bytes(), c.as_slice()].concat()) .collect::>(), @@ -72,6 +92,15 @@ impl Transaction { bytes } + pub fn address(&self) -> TransactionAddress { + TransactionAddress::from_owner(self.owner) + } + + /// Get the bytes that the signature is calculated from. + pub fn bytes_for_signature(&self) -> Vec { + Self::bytes_to_sign(&self.owner, &self.parents, &self.content, &self.outputs) + } + pub fn verify(&self) -> bool { self.owner .verify(&self.signature, self.bytes_for_signature()) diff --git a/autonomi/src/client/mod.rs b/autonomi/src/client/mod.rs index 8a233b8085..fae0a87ba8 100644 --- a/autonomi/src/client/mod.rs +++ b/autonomi/src/client/mod.rs @@ -14,10 +14,12 @@ pub mod payment; pub mod quote; pub mod data; +pub mod files; +pub mod transactions; + #[cfg(feature = "external-signer")] #[cfg_attr(docsrs, doc(cfg(feature = "external-signer")))] pub mod external_signer; -pub mod files; #[cfg(feature = "registers")] #[cfg_attr(docsrs, doc(cfg(feature = "registers")))] pub mod registers; diff --git a/autonomi/src/client/registers.rs b/autonomi/src/client/registers.rs index fa353d4873..d2ae5f203a 100644 --- a/autonomi/src/client/registers.rs +++ b/autonomi/src/client/registers.rs @@ -239,7 +239,7 @@ impl Client { name: String, owner: RegisterSecretKey, ) -> Result { - info!("Getting cost for register with name: {name}"); + trace!("Getting cost for register with name: {name}"); // get register address let pk = owner.public_key(); let name = XorName::from_content_parts(&[name.as_bytes()]); @@ -321,15 +321,6 @@ impl Client { }; let payees = proof.payees(); - - if payees.is_empty() { - error!( - "Failed to get payees from payment proof: {:?}", - RegisterError::PayeesMissing - ); - return Err(RegisterError::PayeesMissing); - } - let signed_register = register.signed_reg.clone(); let record = Record { @@ -356,7 +347,7 @@ impl Client { put_quorum: Quorum::All, retry_strategy: None, use_put_record_to: Some(payees), - verification: Some((VerificationKind::Network, get_cfg)), + verification: Some((VerificationKind::Crdt, get_cfg)), }; debug!("Storing register at address {address} to the network"); diff --git a/autonomi/src/client/transactions.rs b/autonomi/src/client/transactions.rs new file mode 100644 index 0000000000..1585709960 --- /dev/null +++ b/autonomi/src/client/transactions.rs @@ -0,0 +1,153 @@ +// Copyright 2024 MaidSafe.net limited. +// +// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3. +// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed +// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. Please review the Licences for the specific language governing +// permissions and limitations relating to use of the SAFE Network Software. + +use crate::client::data::PayError; +use crate::client::Client; +use crate::client::ClientEvent; +use crate::client::UploadSummary; + +use ant_evm::Amount; +use ant_evm::AttoTokens; +pub use ant_protocol::storage::Transaction; +use ant_protocol::storage::TransactionAddress; +pub use bls::SecretKey; + +use ant_evm::{EvmWallet, EvmWalletError}; +use ant_networking::{GetRecordCfg, NetworkError, PutRecordCfg, VerificationKind}; +use ant_protocol::{ + storage::{try_serialize_record, RecordKind, RetryStrategy}, + NetworkAddress, +}; +use libp2p::kad::{Quorum, Record}; + +use super::data::CostError; + +#[derive(Debug, thiserror::Error)] +pub enum TransactionError { + #[error("Cost error: {0}")] + Cost(#[from] CostError), + #[error("Network error")] + Network(#[from] NetworkError), + #[error("Serialization error")] + Serialization, + #[error("Transaction could not be verified (corrupt)")] + FailedVerification, + #[error("Payment failure occurred during transaction creation.")] + Pay(#[from] PayError), + #[error("Failed to retrieve wallet payment")] + Wallet(#[from] EvmWalletError), + #[error("Received invalid quote from node, this node is possibly malfunctioning, try another node by trying another transaction name")] + InvalidQuote, + #[error("Transaction already exists at this address: {0:?}")] + TransactionAlreadyExists(TransactionAddress), +} + +impl Client { + /// Fetches a Transaction from the network. + pub async fn transaction_get( + &self, + address: TransactionAddress, + ) -> Result, TransactionError> { + let transactions = self.network.get_transactions(address).await?; + + Ok(transactions) + } + + pub async fn transaction_put( + &self, + transaction: Transaction, + wallet: &EvmWallet, + ) -> Result<(), TransactionError> { + let address = transaction.address(); + + // pay for the transaction + let xor_name = address.xorname(); + debug!("Paying for transaction at address: {address:?}"); + let payment_proofs = self + .pay(std::iter::once(*xor_name), wallet) + .await + .inspect_err(|err| { + error!("Failed to pay for transaction at address: {address:?} : {err}") + })?; + + // make sure the transaction was paid for + let (proof, price) = match payment_proofs.get(xor_name) { + Some((proof, price)) => (proof, price), + None => { + // transaction was skipped, meaning it was already paid for + error!("Transaction at address: {address:?} was already paid for"); + return Err(TransactionError::TransactionAlreadyExists(address)); + } + }; + + // prepare the record for network storage + let payees = proof.payees(); + let record = Record { + key: NetworkAddress::from_transaction_address(address).to_record_key(), + value: try_serialize_record(&(proof, &transaction), RecordKind::TransactionWithPayment) + .map_err(|_| TransactionError::Serialization)? + .to_vec(), + publisher: None, + expires: None, + }; + let get_cfg = GetRecordCfg { + get_quorum: Quorum::Majority, + retry_strategy: Some(RetryStrategy::default()), + target_record: None, + expected_holders: Default::default(), + is_register: false, + }; + let put_cfg = PutRecordCfg { + put_quorum: Quorum::All, + retry_strategy: None, + use_put_record_to: Some(payees), + verification: Some((VerificationKind::Crdt, get_cfg)), + }; + + // put the record to the network + debug!("Storing transaction at address {address:?} to the network"); + self.network + .put_record(record, &put_cfg) + .await + .inspect_err(|err| { + error!("Failed to put record - transaction {address:?} to the network: {err}") + })?; + + // send client event + if let Some(channel) = self.client_event_sender.as_ref() { + let summary = UploadSummary { + record_count: 1, + tokens_spent: price.as_atto(), + }; + if let Err(err) = channel.send(ClientEvent::UploadComplete(summary)).await { + error!("Failed to send client event: {err}"); + } + } + + Ok(()) + } + + /// Get the cost to create a transaction + pub async fn transaction_cost(&self, key: SecretKey) -> Result { + let pk = key.public_key(); + trace!("Getting cost for transaction of {pk:?}"); + + let address = TransactionAddress::from_owner(pk); + let xor = *address.xorname(); + let store_quote = self.get_store_quotes(std::iter::once(xor)).await?; + let total_cost = AttoTokens::from_atto( + store_quote + .0 + .values() + .map(|quote| quote.price()) + .sum::(), + ); + debug!("Calculated the cost to create transaction of {pk:?} is {total_cost}"); + Ok(total_cost) + } +} diff --git a/autonomi/tests/transaction.rs b/autonomi/tests/transaction.rs new file mode 100644 index 0000000000..76f0bd760d --- /dev/null +++ b/autonomi/tests/transaction.rs @@ -0,0 +1,53 @@ +// Copyright 2024 MaidSafe.net limited. +// +// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3. +// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed +// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. Please review the Licences for the specific language governing +// permissions and limitations relating to use of the SAFE Network Software. + +use ant_logging::LogBuilder; +use ant_protocol::storage::Transaction; +use autonomi::{client::transactions::TransactionError, Client}; +use eyre::Result; +use test_utils::{evm::get_funded_wallet, peers_from_env}; + +#[tokio::test] +async fn transaction_put() -> Result<()> { + let _log_appender_guard = LogBuilder::init_single_threaded_tokio_test("transaction", false); + + let client = Client::connect(&peers_from_env()?).await?; + let wallet = get_funded_wallet(); + + let key = bls::SecretKey::random(); + let content = [0u8; 32]; + let transaction = Transaction::new(key.public_key(), vec![], content, vec![], &key); + + // estimate the cost of the transaction + let cost = client.transaction_cost(key.clone()).await?; + println!("transaction cost: {cost}"); + + // put the transaction + client.transaction_put(transaction.clone(), &wallet).await?; + println!("transaction put 1"); + + // wait for the transaction to be replicated + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + + // check that the transaction is stored + let txs = client.transaction_get(transaction.address()).await?; + assert_eq!(txs, vec![transaction.clone()]); + println!("transaction got 1"); + + // try put another transaction with the same address + let content2 = [1u8; 32]; + let transaction2 = Transaction::new(key.public_key(), vec![], content2, vec![], &key); + let res = client.transaction_put(transaction2.clone(), &wallet).await; + + assert!(matches!( + res, + Err(TransactionError::TransactionAlreadyExists(address)) + if address == transaction2.address() + )); + Ok(()) +}