From 8ca143b6a58e85cfd75c6eaadcf9997e15a80ecd Mon Sep 17 00:00:00 2001 From: Nadin Jbara <93648739+nadin-Starkware@users.noreply.github.com> Date: Sun, 18 Aug 2024 10:28:15 +0300 Subject: [PATCH 01/16] refactor(infra): clean up scripts folder (#476) --- commitlint.config.js | 1 + crates/starknet_api/scripts/rust_fmt.sh | 3 --- 2 files changed, 1 insertion(+), 3 deletions(-) delete mode 100755 crates/starknet_api/scripts/rust_fmt.sh diff --git a/commitlint.config.js b/commitlint.config.js index 1ad7253124..b49cf581e6 100644 --- a/commitlint.config.js +++ b/commitlint.config.js @@ -34,6 +34,7 @@ const Configuration = { 'fee', 'gateway', 'helm', + 'infra', 'JSON-RPC', 'load_test', 'mempool', diff --git a/crates/starknet_api/scripts/rust_fmt.sh b/crates/starknet_api/scripts/rust_fmt.sh deleted file mode 100755 index a220c5944f..0000000000 --- a/crates/starknet_api/scripts/rust_fmt.sh +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/bash - -cargo +nightly-2023-07-05 fmt --all -- "$@" From 24239049c9e19562b055a1c4754f5c1a367ad652 Mon Sep 17 00:00:00 2001 From: mohammad-starkware <130282237+MohammadNassar1@users.noreply.github.com> Date: Sun, 18 Aug 2024 10:33:18 +0300 Subject: [PATCH 02/16] refactor(mempool): transaction reference method rename to new_from_thin_tx (#486) --- crates/mempool/src/mempool.rs | 2 +- crates/mempool/src/mempool_test.rs | 64 ++++++++++++++------------ crates/mempool/src/transaction_pool.rs | 16 ++++--- 3 files changed, 45 insertions(+), 37 deletions(-) diff --git a/crates/mempool/src/mempool.rs b/crates/mempool/src/mempool.rs index b2fc747b8d..1178974d0f 100644 --- a/crates/mempool/src/mempool.rs +++ b/crates/mempool/src/mempool.rs @@ -224,7 +224,7 @@ pub struct TransactionReference { } impl TransactionReference { - pub fn new(tx: &ThinTransaction) -> Self { + pub fn new_from_thin_tx(tx: &ThinTransaction) -> Self { TransactionReference { sender_address: tx.sender_address, nonce: tx.nonce, diff --git a/crates/mempool/src/mempool_test.rs b/crates/mempool/src/mempool_test.rs index 6343b02e12..3683319aaf 100644 --- a/crates/mempool/src/mempool_test.rs +++ b/crates/mempool/src/mempool_test.rs @@ -211,7 +211,7 @@ fn test_get_txs_returns_by_priority_order(#[case] requested_txs: usize) { let tx_tip_10_account_2 = add_tx_input!(tip: 10, tx_hash: 3, sender_address: "0x2").tx; let mut txs = vec![tx_tip_20_account_0, tx_tip_30_account_1, tx_tip_10_account_2]; - let tx_references_iterator = txs.iter().map(TransactionReference::new); + let tx_references_iterator = txs.iter().map(TransactionReference::new_from_thin_tx); let txs_iterator = txs.iter().cloned(); let mut mempool: Mempool = MempoolContent::new(txs_iterator, tx_references_iterator).into(); @@ -229,7 +229,7 @@ fn test_get_txs_returns_by_priority_order(#[case] requested_txs: usize) { assert_eq!(fetched_txs, expected_queue); // Assert: non-returned transactions are still in the mempool. - let remaining_tx_references = remaining_txs.iter().map(TransactionReference::new); + let remaining_tx_references = remaining_txs.iter().map(TransactionReference::new_from_thin_tx); let mempool_content = MempoolContent::new(remaining_txs.to_vec(), remaining_tx_references); mempool_content.assert_eq_mempool_content(&mempool); } @@ -244,7 +244,7 @@ fn test_get_txs_multi_nonce() { let tx_nonce_2 = add_tx_input!(tx_hash: 3, sender_address: "0x0", tx_nonce: 2_u8, account_nonce: 0_u8).tx; - let queue_txs = [&tx_nonce_0].map(TransactionReference::new); + let queue_txs = [&tx_nonce_0].map(TransactionReference::new_from_thin_tx); let pool_txs = [tx_nonce_0, tx_nonce_1, tx_nonce_2]; let mut mempool: Mempool = MempoolContent::new(pool_txs.clone(), queue_txs).into(); @@ -267,7 +267,8 @@ fn test_get_txs_replenishes_queue_only_between_chunks() { let tx_address_1_nonce_0 = add_tx_input!(tip: 10, tx_hash: 3, sender_address: "0x1", tx_nonce: 0_u8, account_nonce: 0_u8).tx; - let queue_txs = [&tx_address_0_nonce_0, &tx_address_1_nonce_0].map(TransactionReference::new); + let queue_txs = + [&tx_address_0_nonce_0, &tx_address_1_nonce_0].map(TransactionReference::new_from_thin_tx); let pool_txs = [&tx_address_0_nonce_0, &tx_address_0_nonce_1, &tx_address_1_nonce_0].map(|tx| tx.clone()); let mut mempool: Mempool = MempoolContent::new(pool_txs, queue_txs).into(); @@ -295,7 +296,8 @@ fn test_get_txs_replenishes_queue_multi_account_between_chunks() { let tx_address_1_nonce_1 = add_tx_input!(tip: 20, tx_hash: 4, sender_address: "0x1", tx_nonce: 1_u8, account_nonce: 0_u8).tx; - let queue_txs = [&tx_address_0_nonce_0, &tx_address_1_nonce_0].map(TransactionReference::new); + let queue_txs = + [&tx_address_0_nonce_0, &tx_address_1_nonce_0].map(TransactionReference::new_from_thin_tx); let pool_txs = [ &tx_address_0_nonce_0, &tx_address_1_nonce_0, @@ -313,7 +315,7 @@ fn test_get_txs_replenishes_queue_multi_account_between_chunks() { // Queue is replenished with the next transactions of each account. let expected_queue_txs = - [&tx_address_0_nonce_1, &tx_address_1_nonce_1].map(TransactionReference::new); + [&tx_address_0_nonce_1, &tx_address_1_nonce_1].map(TransactionReference::new_from_thin_tx); let expected_pool_txs = [tx_address_0_nonce_1, tx_address_1_nonce_1]; let expected_mempool_content = MempoolContent::new(expected_pool_txs, expected_queue_txs); expected_mempool_content.assert_eq_mempool_content(&mempool); @@ -327,7 +329,7 @@ fn test_get_txs_with_holes_multiple_accounts() { let tx_address_1_nonce_0 = add_tx_input!(tx_hash: 3, sender_address: "0x1", tx_nonce: 0_u8, account_nonce: 0_u8).tx; - let queue_txs = [TransactionReference::new(&tx_address_1_nonce_0)]; + let queue_txs = [TransactionReference::new_from_thin_tx(&tx_address_1_nonce_0)]; let pool_txs = [tx_address_0_nonce_1.clone(), tx_address_1_nonce_0.clone()]; let mut mempool: Mempool = MempoolContent::new(pool_txs, queue_txs).into(); @@ -383,8 +385,10 @@ fn test_add_tx(mut mempool: Mempool) { add_tx_inputs.sort_by_key(|input| std::cmp::Reverse(input.tx.tip)); // Assert: transactions are ordered by priority. - let expected_queue_txs: Vec = - add_tx_inputs.iter().map(|input| TransactionReference::new(&input.tx)).collect(); + let expected_queue_txs: Vec = add_tx_inputs + .iter() + .map(|input| TransactionReference::new_from_thin_tx(&input.tx)) + .collect(); let expected_pool_txs = add_tx_inputs.into_iter().map(|input| input.tx); let expected_mempool_content = MempoolContent::new(expected_pool_txs, expected_queue_txs); expected_mempool_content.assert_eq_mempool_content(&mempool); @@ -406,8 +410,8 @@ fn test_add_tx_multi_nonce_success(mut mempool: Mempool) { add_tx(&mut mempool, &input_address_0_nonce_1); // Assert: only the eligible transactions appear in the queue. - let expected_queue_txs = - [&input_address_1_nonce_0.tx, &input_address_0_nonce_0.tx].map(TransactionReference::new); + let expected_queue_txs = [&input_address_1_nonce_0.tx, &input_address_0_nonce_0.tx] + .map(TransactionReference::new_from_thin_tx); let expected_pool_txs = [input_address_0_nonce_0.tx, input_address_1_nonce_0.tx, input_address_0_nonce_1.tx]; let expected_mempool_content = MempoolContent::new(expected_pool_txs, expected_queue_txs); @@ -440,7 +444,7 @@ fn test_add_tx_lower_than_queued_nonce() { let lower_nonce_input = add_tx_input!(tx_hash: 2, sender_address: "0x0", tx_nonce: 0_u8, account_nonce: 0_u8); - let queue_txs = [TransactionReference::new(&valid_input.tx)]; + let queue_txs = [TransactionReference::new_from_thin_tx(&valid_input.tx)]; let expected_mempool_content = MempoolContent::with_queue(queue_txs); let pool_txs = [valid_input.tx]; let mut mempool: Mempool = MempoolContent::new(pool_txs, queue_txs).into(); @@ -464,8 +468,10 @@ fn test_add_tx_with_identical_tip_succeeds(mut mempool: Mempool) { add_tx(&mut mempool, &input2); // Assert: both transactions are in the mempool. - let expected_queue_txs = - [TransactionReference::new(&input1.tx), TransactionReference::new(&input2.tx)]; + let expected_queue_txs = [ + TransactionReference::new_from_thin_tx(&input1.tx), + TransactionReference::new_from_thin_tx(&input2.tx), + ]; let expected_pool_txs = [input1.tx, input2.tx]; let expected_mempool_content = MempoolContent::new(expected_pool_txs, expected_queue_txs); @@ -482,7 +488,7 @@ fn test_add_tx_delete_tx_with_lower_nonce_than_account_nonce() { let tx_nonce_1_account_nonce_1 = add_tx_input!(tx_hash: 2, sender_address: "0x0", tx_nonce: 1_u8, account_nonce: 1_u8); - let queue_txs = [TransactionReference::new(&tx_nonce_0_account_nonce_0.tx)]; + let queue_txs = [TransactionReference::new_from_thin_tx(&tx_nonce_0_account_nonce_0.tx)]; let pool_txs = [tx_nonce_0_account_nonce_0.tx]; let mut mempool: Mempool = MempoolContent::new(pool_txs, queue_txs).into(); @@ -511,8 +517,8 @@ fn test_tip_priority_over_tx_hash(mut mempool: Mempool) { add_tx(&mut mempool, &input_small_tip_big_hash); // Assert: ensure that the transaction with the higher tip is prioritized higher. - let expected_queue_txs = - [&input_big_tip_small_hash.tx, &input_small_tip_big_hash.tx].map(TransactionReference::new); + let expected_queue_txs = [&input_big_tip_small_hash.tx, &input_small_tip_big_hash.tx] + .map(TransactionReference::new_from_thin_tx); let expected_mempool_content = MempoolContent::with_queue(expected_queue_txs); expected_mempool_content.assert_eq_queue_content(&mempool); } @@ -533,7 +539,7 @@ fn test_add_tx_account_state_fills_hole(mut mempool: Mempool) { // Then, fill it. add_tx(&mut mempool, &tx_input_nonce_2); - let expected_queue_txs = [&tx_input_nonce_1.tx].map(TransactionReference::new); + let expected_queue_txs = [&tx_input_nonce_1.tx].map(TransactionReference::new_from_thin_tx); let expected_mempool_content = MempoolContent::with_queue(expected_queue_txs); expected_mempool_content.assert_eq_queue_content(&mempool); } @@ -549,7 +555,7 @@ fn test_add_tx_sequential_nonces(mut mempool: Mempool) { add_tx(&mut mempool, &input_nonce_1); // Assert: only eligible transaction appears in the queue. - let expected_queue_txs = [TransactionReference::new(&input_nonce_0.tx)]; + let expected_queue_txs = [TransactionReference::new_from_thin_tx(&input_nonce_0.tx)]; let expected_pool_txs = [input_nonce_0.tx, input_nonce_1.tx]; let expected_mempool_content = MempoolContent::new(expected_pool_txs, expected_queue_txs); @@ -575,7 +581,7 @@ fn test_add_tx_filling_hole(mut mempool: Mempool) { add_tx(&mut mempool, &input_nonce_0); // Assert: only the eligible transaction appears in the queue. - let expected_queue_txs = [TransactionReference::new(&input_nonce_0.tx)]; + let expected_queue_txs = [TransactionReference::new_from_thin_tx(&input_nonce_0.tx)]; let expected_pool_txs = [input_nonce_1.tx, input_nonce_0.tx]; let expected_mempool_content = MempoolContent::new(expected_pool_txs, expected_queue_txs); expected_mempool_content.assert_eq_mempool_content(&mempool); @@ -592,7 +598,7 @@ fn test_commit_block_includes_all_txs() { let tx_address2_nonce1 = add_tx_input!(tip: 1, tx_hash: 4, sender_address: "0x2", tx_nonce: 1_u8, account_nonce: 1_u8).tx; let queue_txs = [&tx_address0_nonce4, &tx_address1_nonce3, &tx_address2_nonce1] - .map(TransactionReference::new); + .map(TransactionReference::new_from_thin_tx); let pool_txs = [tx_address0_nonce4, tx_address0_nonce5, tx_address1_nonce3, tx_address2_nonce1]; let mut mempool: Mempool = MempoolContent::new(pool_txs.clone(), queue_txs).into(); @@ -613,7 +619,7 @@ fn test_commit_block_rewinds_nonce() { // Setup. let tx_address0_nonce5 = add_tx_input!(tip: 1, tx_hash: 2, sender_address: "0x0", tx_nonce: 5_u8, account_nonce: 4_u8).tx; - let queued_txs = [TransactionReference::new(&tx_address0_nonce5)]; + let queued_txs = [TransactionReference::new_from_thin_tx(&tx_address0_nonce5)]; let pool_txs = [tx_address0_nonce5]; let mut mempool: Mempool = MempoolContent::new(pool_txs, queued_txs).into(); @@ -637,7 +643,7 @@ fn test_commit_block_from_different_leader() { let tx_address0_nonce6 = add_tx_input!(tip: 1, tx_hash: 3, sender_address: "0x0", tx_nonce: 6_u8, account_nonce: 2_u8).tx; let tx_address1_nonce2 = add_tx_input!(tip: 1, tx_hash: 4, sender_address: "0x1", tx_nonce: 2_u8, account_nonce: 2_u8).tx; - let queued_txs = [TransactionReference::new(&tx_address1_nonce2)]; + let queued_txs = [TransactionReference::new_from_thin_tx(&tx_address1_nonce2)]; let pool_txs = [tx_address0_nonce3, tx_address0_nonce5, tx_address0_nonce6.clone(), tx_address1_nonce2]; let mut mempool: Mempool = MempoolContent::new(pool_txs, queued_txs).into(); @@ -652,7 +658,7 @@ fn test_commit_block_from_different_leader() { assert!(mempool.commit_block(state_changes).is_ok()); // Assert. - let expected_queue_txs = [&tx_address0_nonce6].map(TransactionReference::new); + let expected_queue_txs = [&tx_address0_nonce6].map(TransactionReference::new_from_thin_tx); let expected_mempool_content = MempoolContent::with_queue(expected_queue_txs); expected_mempool_content.assert_eq_queue_content(&mempool); } @@ -704,7 +710,7 @@ fn test_flow_partial_commit_block() { add_tx_input!(tip: 0, tx_hash: 7, sender_address: "0x2", tx_nonce: 2_u8, account_nonce: 2_u8).tx; let queue_txs = [&tx_address0_nonce3, &tx_address1_nonce0, &tx_address2_nonce2] - .map(TransactionReference::new); + .map(TransactionReference::new_from_thin_tx); let pool_txs = [ &tx_address0_nonce3, &tx_address0_nonce5, @@ -747,7 +753,7 @@ fn test_flow_commit_block_closes_hole() { let tx_nonce5 = add_tx_input!(tip: 12, tx_hash: 3, sender_address: "0x0", tx_nonce: 5_u8, account_nonce: 3_u8).tx; - let queued_txs = [TransactionReference::new(&tx_nonce3)]; + let queued_txs = [TransactionReference::new_from_thin_tx(&tx_nonce3)]; let pool_txs = [tx_nonce3, tx_nonce5.clone()]; let mut mempool: Mempool = MempoolContent::new(pool_txs, queued_txs).into(); @@ -757,7 +763,7 @@ fn test_flow_commit_block_closes_hole() { assert!(mempool.commit_block(state_changes).is_ok()); // Assert: hole was indeed closed. - let expected_queue_txs = [&tx_nonce5].map(TransactionReference::new); + let expected_queue_txs = [&tx_nonce5].map(TransactionReference::new_from_thin_tx); let expected_mempool_content = MempoolContent::with_queue(expected_queue_txs); expected_mempool_content.assert_eq_queue_content(&mempool); @@ -780,7 +786,7 @@ fn test_flow_send_same_nonce_tx_after_previous_not_included() { let tx_nonce5 = add_tx_input!(tip: 12, tx_hash: 3, sender_address: "0x0", tx_nonce: 5_u8, account_nonce: 3_u8).tx; - let queue_txs = [TransactionReference::new(&tx_nonce3)]; + let queue_txs = [TransactionReference::new_from_thin_tx(&tx_nonce3)]; let pool_txs = [&tx_nonce3, &tx_input_nonce4.tx, &tx_nonce5].map(|tx| tx.clone()); let mut mempool: Mempool = MempoolContent::new(pool_txs, queue_txs).into(); @@ -798,7 +804,7 @@ fn test_flow_send_same_nonce_tx_after_previous_not_included() { // Assert. assert_eq!(txs, &[tx_input_nonce4.tx]); - let expected_queue_txs = [TransactionReference::new(&tx_nonce5)]; + let expected_queue_txs = [TransactionReference::new_from_thin_tx(&tx_nonce5)]; let expected_mempool_content = MempoolContent::with_queue(expected_queue_txs); expected_mempool_content.assert_eq_queue_content(&mempool); } diff --git a/crates/mempool/src/transaction_pool.rs b/crates/mempool/src/transaction_pool.rs index 89beb33d89..77833a42de 100644 --- a/crates/mempool/src/transaction_pool.rs +++ b/crates/mempool/src/transaction_pool.rs @@ -28,7 +28,7 @@ pub struct TransactionPool { impl TransactionPool { pub fn insert(&mut self, tx: ThinTransaction) -> MempoolResult<()> { - let tx_reference = TransactionReference::new(&tx); + let tx_reference = TransactionReference::new_from_thin_tx(&tx); let tx_hash = tx_reference.tx_hash; // Insert to pool. @@ -56,12 +56,14 @@ impl TransactionPool { self.tx_pool.remove(&tx_hash).ok_or(MempoolError::TransactionNotFound { tx_hash })?; // Remove from account mapping. - self.txs_by_account.remove(TransactionReference::new(&tx)).unwrap_or_else(|| { - panic!( - "Transaction pool consistency error: transaction with hash {tx_hash} appears in \ - main mapping, but does not appear in the account mapping" - ) - }); + self.txs_by_account.remove(TransactionReference::new_from_thin_tx(&tx)).unwrap_or_else( + || { + panic!( + "Transaction pool consistency error: transaction with hash {tx_hash} appears \ + in main mapping, but does not appear in the account mapping" + ) + }, + ); Ok(tx) } From a37bcb66272a9ced0816f9e8ae51de3b89e9ecdf Mon Sep 17 00:00:00 2001 From: DvirYo-starkware <115620476+DvirYo-starkware@users.noreply.github.com> Date: Sun, 18 Aug 2024 11:31:47 +0300 Subject: [PATCH 03/16] feat(): print the execution time in latency_histogram macro (#457) --- Cargo.lock | 1 + crates/papyrus_proc_macros/Cargo.toml | 1 + crates/papyrus_proc_macros/src/lib.rs | 8 ++++++-- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7644fe9b94..42fbe5ae72 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6391,6 +6391,7 @@ dependencies = [ "prometheus-parse", "quote", "syn 2.0.61", + "tracing", ] [[package]] diff --git a/crates/papyrus_proc_macros/Cargo.toml b/crates/papyrus_proc_macros/Cargo.toml index da3d40544f..246ae21407 100644 --- a/crates/papyrus_proc_macros/Cargo.toml +++ b/crates/papyrus_proc_macros/Cargo.toml @@ -9,6 +9,7 @@ description = "Procedural macros for the Papyrus node" [dependencies] quote = "1.0.26" syn = { version = "2.0.39", features = ["full"] } +tracing.workspace = true [dev-dependencies] metrics.workspace = true diff --git a/crates/papyrus_proc_macros/src/lib.rs b/crates/papyrus_proc_macros/src/lib.rs index 3be7f692c7..b04ef485fa 100644 --- a/crates/papyrus_proc_macros/src/lib.rs +++ b/crates/papyrus_proc_macros/src/lib.rs @@ -112,6 +112,7 @@ pub fn versioned_rpc(attr: TokenStream, input: TokenStream) -> TokenStream { } /// This macro will emit a histogram metric with the given name and the latency of the function. +/// In addition, also a debug log with the metric name and the execution time will be emitted. /// The macro also receives a boolean for whether it will be emitted only when /// profiling is activated or at all times. /// @@ -125,7 +126,8 @@ pub fn versioned_rpc(attr: TokenStream, input: TokenStream) -> TokenStream { /// } /// ``` /// Every call to foo will update the histogram metric with the name “metric_name” with the time it -/// took to execute foo. +/// took to execute foo. In addition, a debug log with the following format will be emitted: +/// “: ” /// The metric will be emitted regardless of the value of the profiling configuration, /// since the config value is false. #[proc_macro_attribute] @@ -163,7 +165,9 @@ pub fn latency_histogram(attr: TokenStream, input: TokenStream) -> TokenStream { } let return_value=#origin_block; if let Some(start_time) = start_function_time { - metrics::histogram!(#metric_name, start_time.elapsed().as_secs_f64()); + let exec_time = start_time.elapsed().as_secs_f64(); + metrics::histogram!(#metric_name, exec_time); + tracing::debug!("{}: {}", #metric_name, exec_time); } return_value } From 7e067de4e72f9c6737643abf544829bec08e1613 Mon Sep 17 00:00:00 2001 From: Arnon Hod Date: Sun, 18 Aug 2024 12:02:28 +0300 Subject: [PATCH 04/16] chore: wrap cairo lang version id for code dedup (#83) --- crates/gateway/src/compiler_version.rs | 83 ++++++------- crates/gateway/src/config.rs | 4 +- .../src/stateless_transaction_validator.rs | 3 +- .../stateless_transaction_validator_test.rs | 109 ++++++++++-------- crates/gateway/src/test_utils.rs | 1 + 5 files changed, 102 insertions(+), 98 deletions(-) diff --git a/crates/gateway/src/compiler_version.rs b/crates/gateway/src/compiler_version.rs index ee35b032bd..ce6fa55c23 100644 --- a/crates/gateway/src/compiler_version.rs +++ b/crates/gateway/src/compiler_version.rs @@ -8,7 +8,6 @@ use serde::{Deserialize, Serialize}; use starknet_sierra_compile::utils::sierra_program_as_felts_to_big_uint_as_hex; use starknet_types_core::felt::Felt; use thiserror::Error; -use validator::Validate; #[derive(Debug, Error)] #[cfg_attr(test, derive(PartialEq))] @@ -19,52 +18,34 @@ pub enum VersionIdError { InvalidVersion { message: String }, } -// TODO(Arni): Share this struct with the Cairo lang crate. -#[derive(Clone, Copy, Debug, Serialize, Deserialize, Validate, PartialEq)] -pub struct VersionId { - pub major: usize, - pub minor: usize, - pub patch: usize, -} - -impl VersionId { - pub const MIN: Self = Self { major: 0, minor: 0, patch: 0 }; - pub const MAX: Self = Self { major: usize::MAX, minor: usize::MAX, patch: usize::MAX }; -} - -impl From for CairoLangVersionId { - fn from(version: VersionId) -> Self { - CairoLangVersionId { major: version.major, minor: version.minor, patch: version.patch } - } -} - -impl From for VersionId { - fn from(version: CairoLangVersionId) -> Self { - VersionId { major: version.major, minor: version.minor, patch: version.patch } - } -} +#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)] +pub struct VersionId(pub CairoLangVersionId); impl std::fmt::Display for VersionId { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - CairoLangVersionId::from(*self).fmt(f) + self.0.fmt(f) } } impl VersionId { - pub fn from_sierra_program(sierra_program: &[Felt]) -> Result { - let sierra_program_length = sierra_program.len(); + pub const MIN: Self = Self(CairoLangVersionId { major: 0, minor: 0, patch: 0 }); + pub const MAX: Self = + Self(CairoLangVersionId { major: usize::MAX, minor: usize::MAX, patch: usize::MAX }); + + pub fn new(major: usize, minor: usize, patch: usize) -> Self { + Self(CairoLangVersionId { major, minor, patch }) + } - if sierra_program_length < 6 { - return Err(VersionIdError::InvalidVersion { + pub fn from_sierra_program(sierra_program: &[Felt]) -> Result { + let sierra_program_for_compiler = sierra_program_as_felts_to_big_uint_as_hex( + sierra_program.get(..6).ok_or(VersionIdError::InvalidVersion { message: format!( - "Sierra program is too short. Got program of length {}, which is not long \ - enough for Sierra program's headers.", - sierra_program_length + "Failed to retrieve version from the program: insufficient length. Expected \ + at least 6 felts (got {}).", + sierra_program.len() ), - }); - } - let sierra_program_for_compiler = - sierra_program_as_felts_to_big_uint_as_hex(&sierra_program[..6]); + })?, + ); let (version_id, _compiler_version_id) = version_id_from_serialized_sierra_program( &sierra_program_for_compiler, @@ -73,19 +54,27 @@ impl VersionId { message: format!("Error extracting version ID from Sierra program: {err}"), })?; - Ok(version_id.into()) + Ok(VersionId(version_id)) } } impl PartialOrd for VersionId { fn partial_cmp(&self, other: &Self) -> Option { - if self.major != other.major { - return Some(self.major.cmp(&other.major)); + // An implementation of partial_cmp for VersionId. + fn partial_cmp( + lhs: &CairoLangVersionId, + rhs: &CairoLangVersionId, + ) -> Option { + if lhs.major != rhs.major { + return Some(lhs.major.cmp(&rhs.major)); + } + if lhs.minor != rhs.minor { + return Some(lhs.minor.cmp(&rhs.minor)); + } + lhs.patch.partial_cmp(&rhs.patch) } - if self.minor != other.minor { - return Some(self.minor.cmp(&other.minor)); - } - self.patch.partial_cmp(&other.patch) + + partial_cmp(&self.0, &other.0) } } @@ -94,19 +83,19 @@ impl SerializeConfig for VersionId { BTreeMap::from_iter([ ser_param( "major", - &self.major, + &self.0.major, "The major version of the configuration.", ParamPrivacyInput::Public, ), ser_param( "minor", - &self.minor, + &self.0.minor, "The minor version of the configuration.", ParamPrivacyInput::Public, ), ser_param( "patch", - &self.patch, + &self.0.patch, "The patch version of the configuration.", ParamPrivacyInput::Public, ), diff --git a/crates/gateway/src/config.rs b/crates/gateway/src/config.rs index 1b782254f0..87543e2da8 100644 --- a/crates/gateway/src/config.rs +++ b/crates/gateway/src/config.rs @@ -86,8 +86,8 @@ impl Default for StatelessTransactionValidatorConfig { max_calldata_length: 4000, max_signature_length: 4000, max_contract_class_object_size: 4089446, - min_sierra_version: VersionId { major: 1, minor: 1, patch: 0 }, - max_sierra_version: VersionId { major: 1, minor: 5, patch: usize::MAX }, + min_sierra_version: VersionId::new(1, 1, 0), + max_sierra_version: VersionId::new(1, 5, usize::MAX), } } } diff --git a/crates/gateway/src/stateless_transaction_validator.rs b/crates/gateway/src/stateless_transaction_validator.rs index 266946ed4c..b5ca99180b 100644 --- a/crates/gateway/src/stateless_transaction_validator.rs +++ b/crates/gateway/src/stateless_transaction_validator.rs @@ -123,7 +123,8 @@ impl StatelessTransactionValidator { ) -> StatelessTransactionValidatorResult<()> { // Any patch version is valid. (i.e. when check version for upper bound, we ignore the Z // part in a version X.Y.Z). - let max_sierra_version = VersionId { patch: usize::MAX, ..self.config.max_sierra_version }; + let mut max_sierra_version = self.config.max_sierra_version; + max_sierra_version.0.patch = usize::MAX; let sierra_version = VersionId::from_sierra_program(sierra_program)?; if self.config.min_sierra_version <= sierra_version && sierra_version <= max_sierra_version diff --git a/crates/gateway/src/stateless_transaction_validator_test.rs b/crates/gateway/src/stateless_transaction_validator_test.rs index 8d2b4297af..294af871f8 100644 --- a/crates/gateway/src/stateless_transaction_validator_test.rs +++ b/crates/gateway/src/stateless_transaction_validator_test.rs @@ -1,3 +1,4 @@ +use std::sync::OnceLock; use std::vec; use assert_matches::assert_matches; @@ -27,26 +28,34 @@ use crate::stateless_transaction_validator::{ }; use crate::test_utils::create_sierra_program; -const MIN_SIERRA_VERSION: VersionId = VersionId { major: 1, minor: 1, patch: 0 }; -const MAX_SIERRA_VERSION: VersionId = VersionId { major: 1, minor: 5, patch: usize::MAX }; - -const DEFAULT_VALIDATOR_CONFIG_FOR_TESTING: StatelessTransactionValidatorConfig = - StatelessTransactionValidatorConfig { +fn min_sierra_version() -> &'static VersionId { + static MIN_SIERRA_VERSION: OnceLock = OnceLock::new(); + MIN_SIERRA_VERSION.get_or_init(|| VersionId::new(1, 1, 0)) +} +fn max_sierra_version() -> &'static VersionId { + static MAX_SIERRA_VERSION: OnceLock = OnceLock::new(); + MAX_SIERRA_VERSION.get_or_init(|| VersionId::new(1, 5, usize::MAX)) +} +fn default_validator_config_for_testing() -> &'static StatelessTransactionValidatorConfig { + static DEFAULT_VALIDATOR_CONFIG_FOR_TESTING: OnceLock = + OnceLock::new(); + DEFAULT_VALIDATOR_CONFIG_FOR_TESTING.get_or_init(|| StatelessTransactionValidatorConfig { validate_non_zero_l1_gas_fee: false, validate_non_zero_l2_gas_fee: false, max_calldata_length: 1, max_signature_length: 1, max_contract_class_object_size: 100000, - min_sierra_version: MIN_SIERRA_VERSION, - max_sierra_version: MAX_SIERRA_VERSION, - }; + min_sierra_version: *min_sierra_version(), + max_sierra_version: *max_sierra_version(), + }) +} #[rstest] #[case::ignore_resource_bounds( StatelessTransactionValidatorConfig{ validate_non_zero_l1_gas_fee: false, validate_non_zero_l2_gas_fee: false, - ..DEFAULT_VALIDATOR_CONFIG_FOR_TESTING + ..default_validator_config_for_testing().clone() }, zero_resource_bounds_mapping(), calldata![], @@ -56,7 +65,7 @@ const DEFAULT_VALIDATOR_CONFIG_FOR_TESTING: StatelessTransactionValidatorConfig StatelessTransactionValidatorConfig{ validate_non_zero_l1_gas_fee: true, validate_non_zero_l2_gas_fee: false, - ..DEFAULT_VALIDATOR_CONFIG_FOR_TESTING + ..default_validator_config_for_testing().clone() }, create_resource_bounds_mapping(NON_EMPTY_RESOURCE_BOUNDS, ResourceBounds::default()), calldata![], @@ -66,7 +75,7 @@ const DEFAULT_VALIDATOR_CONFIG_FOR_TESTING: StatelessTransactionValidatorConfig StatelessTransactionValidatorConfig{ validate_non_zero_l1_gas_fee: false, validate_non_zero_l2_gas_fee: true, - ..DEFAULT_VALIDATOR_CONFIG_FOR_TESTING + ..default_validator_config_for_testing().clone() }, create_resource_bounds_mapping(ResourceBounds::default(), NON_EMPTY_RESOURCE_BOUNDS), calldata![], @@ -76,26 +85,26 @@ const DEFAULT_VALIDATOR_CONFIG_FOR_TESTING: StatelessTransactionValidatorConfig StatelessTransactionValidatorConfig{ validate_non_zero_l1_gas_fee: true, validate_non_zero_l2_gas_fee: true, - ..DEFAULT_VALIDATOR_CONFIG_FOR_TESTING + ..default_validator_config_for_testing().clone() }, create_resource_bounds_mapping(NON_EMPTY_RESOURCE_BOUNDS, NON_EMPTY_RESOURCE_BOUNDS), calldata![], TransactionSignature::default() )] #[case::non_empty_valid_calldata( - DEFAULT_VALIDATOR_CONFIG_FOR_TESTING, + default_validator_config_for_testing().clone(), zero_resource_bounds_mapping(), calldata![Felt::ONE], TransactionSignature::default() )] #[case::non_empty_valid_signature( - DEFAULT_VALIDATOR_CONFIG_FOR_TESTING, + default_validator_config_for_testing().clone(), zero_resource_bounds_mapping(), calldata![], TransactionSignature(vec![Felt::ONE]) )] #[case::valid_tx( - DEFAULT_VALIDATOR_CONFIG_FOR_TESTING, + default_validator_config_for_testing().clone(), zero_resource_bounds_mapping(), calldata![], TransactionSignature::default() @@ -119,7 +128,7 @@ fn test_positive_flow( StatelessTransactionValidatorConfig{ validate_non_zero_l1_gas_fee: true, validate_non_zero_l2_gas_fee: false, - ..DEFAULT_VALIDATOR_CONFIG_FOR_TESTING + ..default_validator_config_for_testing().clone() }, zero_resource_bounds_mapping(), StatelessTransactionValidatorError::ZeroResourceBounds{ @@ -130,7 +139,7 @@ fn test_positive_flow( StatelessTransactionValidatorConfig{ validate_non_zero_l1_gas_fee: false, validate_non_zero_l2_gas_fee: true, - ..DEFAULT_VALIDATOR_CONFIG_FOR_TESTING + ..default_validator_config_for_testing().clone() }, create_resource_bounds_mapping(NON_EMPTY_RESOURCE_BOUNDS, ResourceBounds::default()), StatelessTransactionValidatorError::ZeroResourceBounds{ @@ -160,7 +169,7 @@ fn test_calldata_too_long( #[values(TransactionType::DeployAccount, TransactionType::Invoke)] tx_type: TransactionType, ) { let tx_validator = - StatelessTransactionValidator { config: DEFAULT_VALIDATOR_CONFIG_FOR_TESTING }; + StatelessTransactionValidator { config: default_validator_config_for_testing().clone() }; let tx = external_tx_for_testing( tx_type, zero_resource_bounds_mapping(), @@ -183,7 +192,7 @@ fn test_signature_too_long( tx_type: TransactionType, ) { let tx_validator = - StatelessTransactionValidator { config: DEFAULT_VALIDATOR_CONFIG_FOR_TESTING }; + StatelessTransactionValidator { config: default_validator_config_for_testing().clone() }; let tx = external_tx_for_testing( tx_type, zero_resource_bounds_mapping(), @@ -205,8 +214,8 @@ fn test_signature_too_long( vec![], StatelessTransactionValidatorError::InvalidSierraVersion ( VersionIdError::InvalidVersion { - message: "Sierra program is too short. Got program of length 0, which is not \ - long enough for Sierra program's headers.".into() + message: "Failed to retrieve version from the program: insufficient length. Expected \ + at least 6 felts (got 0).".into() } ) )] @@ -214,8 +223,8 @@ fn test_signature_too_long( vec![felt!(1_u128)], StatelessTransactionValidatorError::InvalidSierraVersion ( VersionIdError::InvalidVersion { - message: "Sierra program is too short. Got program of length 1, which is not \ - long enough for Sierra program's headers.".into() + message: "Failed to retrieve version from the program: insufficient length. Expected \ + at least 6 felts (got 1).".into() } ) )] @@ -223,8 +232,8 @@ fn test_signature_too_long( vec![felt!(1_u128), felt!(3_u128), felt!(0_u128)], StatelessTransactionValidatorError::InvalidSierraVersion ( VersionIdError::InvalidVersion { - message: "Sierra program is too short. Got program of length 3, which is not \ - long enough for Sierra program's headers.".into() + message: "Failed to retrieve version from the program: insufficient length. Expected \ + at least 6 felts (got 3).".into() } ) )] @@ -232,8 +241,8 @@ fn test_signature_too_long( vec![felt!(1_u128), felt!(3_u128), felt!(0_u128), felt!(0_u128)], StatelessTransactionValidatorError::InvalidSierraVersion ( VersionIdError::InvalidVersion { - message: "Sierra program is too short. Got program of length 4, which is not \ - long enough for Sierra program's headers.".into() + message: "Failed to retrieve version from the program: insufficient length. Expected \ + at least 6 felts (got 4).".into() } ) )] @@ -255,19 +264,19 @@ fn test_signature_too_long( ) ] #[case::sierra_version_too_low( - create_sierra_program(&VersionId { major: 0, minor: 3, patch: 0 }), + create_sierra_program(&VersionId::new(0,3,0)), StatelessTransactionValidatorError::UnsupportedSierraVersion { - version: VersionId{major: 0, minor: 3, patch: 0}, - min_version: MIN_SIERRA_VERSION, - max_version: MAX_SIERRA_VERSION, + version: VersionId::new(0,3,0), + min_version: *min_sierra_version(), + max_version: *max_sierra_version(), }) ] #[case::sierra_version_too_high( - create_sierra_program(&VersionId { major: 1, minor: 6, patch: 0 }), + create_sierra_program(&VersionId::new(1,6,0)), StatelessTransactionValidatorError::UnsupportedSierraVersion { - version: VersionId { major: 1, minor: 6, patch: 0 }, - min_version: MIN_SIERRA_VERSION, - max_version: MAX_SIERRA_VERSION, + version: VersionId::new(1,6,0), + min_version: *min_sierra_version(), + max_version: *max_sierra_version(), }) ] fn test_declare_sierra_version_failure( @@ -275,7 +284,7 @@ fn test_declare_sierra_version_failure( #[case] expected_error: StatelessTransactionValidatorError, ) { let tx_validator = - StatelessTransactionValidator { config: DEFAULT_VALIDATOR_CONFIG_FOR_TESTING }; + StatelessTransactionValidator { config: default_validator_config_for_testing().clone() }; let contract_class = ContractClass { sierra_program, ..Default::default() }; let tx = external_declare_tx(declare_tx_args!(contract_class)); @@ -284,14 +293,18 @@ fn test_declare_sierra_version_failure( } #[rstest] -#[case::min_sierra_version(create_sierra_program(&MIN_SIERRA_VERSION))] -#[case::valid_sierra_version(create_sierra_program(&VersionId { major: 1, minor: 3, patch: 0 }))] -#[case::max_sierra_version_patch_zero(create_sierra_program(&VersionId { patch: 0, ..MAX_SIERRA_VERSION }))] -#[case::max_sierra_version_patch_non_trivial(create_sierra_program(&VersionId { patch: 1, ..MAX_SIERRA_VERSION }))] -#[case::max_sierra_version(create_sierra_program(&MAX_SIERRA_VERSION))] +#[case::min_sierra_version(create_sierra_program(min_sierra_version()))] +#[case::valid_sierra_version(create_sierra_program(&VersionId::new( 1, 3, 0 )))] +#[case::max_sierra_version_patch_zero(create_sierra_program( + &VersionId::new( max_sierra_version().0.major, max_sierra_version().0.minor, 0) +))] +#[case::max_sierra_version_patch_non_trivial(create_sierra_program( + &VersionId::new(max_sierra_version().0.major, max_sierra_version().0.minor, 1) +))] +#[case::max_sierra_version(create_sierra_program(max_sierra_version()))] fn test_declare_sierra_version_sucsses(#[case] sierra_program: Vec) { let tx_validator = - StatelessTransactionValidator { config: DEFAULT_VALIDATOR_CONFIG_FOR_TESTING }; + StatelessTransactionValidator { config: default_validator_config_for_testing().clone() }; let contract_class = ContractClass { sierra_program, ..Default::default() }; let tx = external_declare_tx(declare_tx_args!(contract_class)); @@ -305,11 +318,11 @@ fn test_declare_contract_class_size_too_long() { let tx_validator = StatelessTransactionValidator { config: StatelessTransactionValidatorConfig { max_contract_class_object_size: config_max_contract_class_object_size, - ..DEFAULT_VALIDATOR_CONFIG_FOR_TESTING + ..default_validator_config_for_testing().clone() }, }; let contract_class = ContractClass { - sierra_program: create_sierra_program(&MIN_SIERRA_VERSION), + sierra_program: create_sierra_program(min_sierra_version()), ..Default::default() }; let contract_class_length = serde_json::to_string(&contract_class).unwrap().len(); @@ -370,10 +383,10 @@ fn test_declare_entry_points_not_sorted_by_selector( #[case] expected: StatelessTransactionValidatorResult<()>, ) { let tx_validator = - StatelessTransactionValidator { config: DEFAULT_VALIDATOR_CONFIG_FOR_TESTING }; + StatelessTransactionValidator { config: default_validator_config_for_testing().clone() }; let contract_class = ContractClass { - sierra_program: create_sierra_program(&MIN_SIERRA_VERSION), + sierra_program: create_sierra_program(min_sierra_version()), entry_points_by_type: EntryPointByType { constructor: entry_points.clone(), external: vec![], @@ -386,7 +399,7 @@ fn test_declare_entry_points_not_sorted_by_selector( assert_eq!(tx_validator.validate(&tx), expected); let contract_class = ContractClass { - sierra_program: create_sierra_program(&MIN_SIERRA_VERSION), + sierra_program: create_sierra_program(min_sierra_version()), entry_points_by_type: EntryPointByType { constructor: vec![], external: entry_points.clone(), @@ -399,7 +412,7 @@ fn test_declare_entry_points_not_sorted_by_selector( assert_eq!(tx_validator.validate(&tx), expected); let contract_class = ContractClass { - sierra_program: create_sierra_program(&MIN_SIERRA_VERSION), + sierra_program: create_sierra_program(min_sierra_version()), entry_points_by_type: EntryPointByType { constructor: vec![], external: vec![], diff --git a/crates/gateway/src/test_utils.rs b/crates/gateway/src/test_utils.rs index 1fac568182..e502ea36ba 100644 --- a/crates/gateway/src/test_utils.rs +++ b/crates/gateway/src/test_utils.rs @@ -3,6 +3,7 @@ use starknet_types_core::felt::Felt; use crate::compiler_version::VersionId; pub fn create_sierra_program(version_id: &VersionId) -> Vec { + let version_id = version_id.0; vec![ // Sierra Version ID. Felt::from(u64::try_from(version_id.major).unwrap()), From 32e38594c1b20b5907cc22927b1da3e51cab50f0 Mon Sep 17 00:00:00 2001 From: matan-starkware <97523054+matan-starkware@users.noreply.github.com> Date: Sun, 18 Aug 2024 13:20:17 +0300 Subject: [PATCH 05/16] feat(consensus): simulations with the test config will use the fake consensus sync (#414) --- crates/papyrus_node/src/main.rs | 32 +++++++++++++++++++++++--------- 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/crates/papyrus_node/src/main.rs b/crates/papyrus_node/src/main.rs index b40384a3f9..c578bf58fe 100644 --- a/crates/papyrus_node/src/main.rs +++ b/crates/papyrus_node/src/main.rs @@ -7,6 +7,7 @@ use std::process::exit; use std::sync::Arc; use std::time::Duration; +use futures::stream::StreamExt; use futures::FutureExt; use papyrus_base_layer::ethereum_base_layer_contract::EthereumBaseLayerConfig; use papyrus_common::metrics::COLLECT_PROFILING_METRICS; @@ -40,14 +41,14 @@ use papyrus_sync::sources::base_layer::{BaseLayerSourceError, EthereumBaseLayerS use papyrus_sync::sources::central::{CentralError, CentralSource, CentralSourceConfig}; use papyrus_sync::sources::pending::PendingSource; use papyrus_sync::{StateSync, StateSyncError, SyncConfig}; -use starknet_api::block::BlockHash; +use starknet_api::block::{BlockHash, BlockNumber}; use starknet_api::felt; use starknet_client::reader::objects::pending_data::{PendingBlock, PendingBlockOrDeprecated}; use starknet_client::reader::PendingData; use tokio::sync::RwLock; use tokio::task::{JoinError, JoinHandle}; use tracing::metadata::LevelFilter; -use tracing::{debug_span, error, info, warn, Instrument}; +use tracing::{debug, debug_span, error, info, warn, Instrument}; use tracing_subscriber::prelude::*; use tracing_subscriber::{fmt, EnvFilter}; @@ -103,17 +104,20 @@ fn run_consensus( info!("Consensus is disabled."); return Ok(tokio::spawn(pending())); }; + debug!("Consensus configuration: {config:?}"); let network_channels = network_manager .register_broadcast_topic(Topic::new(config.network_topic.clone()), BUFFER_SIZE)?; - let context = PapyrusConsensusContext::new( - storage_reader.clone(), - network_channels.messages_to_broadcast_sender, - config.num_validators, - None, - ); // TODO(matan): connect this to an actual channel. if let Some(test_config) = config.test.as_ref() { + let sync_channels = network_manager + .register_broadcast_topic(Topic::new(test_config.sync_topic.clone()), BUFFER_SIZE)?; + let context = PapyrusConsensusContext::new( + storage_reader.clone(), + network_channels.messages_to_broadcast_sender, + config.num_validators, + Some(sync_channels.messages_to_broadcast_sender), + ); let network_receiver = NetworkReceiver::new( network_channels.broadcasted_messages_receiver, test_config.cache_size, @@ -121,15 +125,25 @@ fn run_consensus( test_config.drop_probability, test_config.invalid_probability, ); + let sync_receiver = + sync_channels.broadcasted_messages_receiver.map(|(vote, _report_sender)| { + BlockNumber(vote.expect("Sync channel should never have errors").height) + }); Ok(tokio::spawn(papyrus_consensus::run_consensus( context, config.start_height, config.validator_id, config.consensus_delay, network_receiver, - futures::stream::pending(), + sync_receiver, ))) } else { + let context = PapyrusConsensusContext::new( + storage_reader.clone(), + network_channels.messages_to_broadcast_sender, + config.num_validators, + None, + ); Ok(tokio::spawn(papyrus_consensus::run_consensus( context, config.start_height, From c45b4e505e62a98deab29ca20dc772f76e41fa96 Mon Sep 17 00:00:00 2001 From: matan-starkware <97523054+matan-starkware@users.noreply.github.com> Date: Sun, 18 Aug 2024 13:34:29 +0300 Subject: [PATCH 06/16] refactor(consensus): rename notify_decision to decision_reached (#465) --- crates/sequencing/papyrus_consensus/src/manager.rs | 2 +- crates/sequencing/papyrus_consensus/src/manager_test.rs | 6 +++--- .../papyrus_consensus/src/papyrus_consensus_context.rs | 2 +- .../papyrus_consensus/src/papyrus_consensus_context_test.rs | 2 +- crates/sequencing/papyrus_consensus/src/test_utils.rs | 2 +- crates/sequencing/papyrus_consensus/src/types.rs | 2 +- 6 files changed, 8 insertions(+), 8 deletions(-) diff --git a/crates/sequencing/papyrus_consensus/src/manager.rs b/crates/sequencing/papyrus_consensus/src/manager.rs index 80df134a3c..3e24434eb7 100644 --- a/crates/sequencing/papyrus_consensus/src/manager.rs +++ b/crates/sequencing/papyrus_consensus/src/manager.rs @@ -65,7 +65,7 @@ where tokio::select! { decision = run_height => { let decision = decision?; - context.notify_decision(decision.block, decision.precommits).await?; + context.decision_reached(decision.block, decision.precommits).await?; current_height = current_height.unchecked_next(); }, sync_height = sync_height(current_height, &mut sync_receiver) => { diff --git a/crates/sequencing/papyrus_consensus/src/manager_test.rs b/crates/sequencing/papyrus_consensus/src/manager_test.rs index 6f26a5d138..337b475bb0 100644 --- a/crates/sequencing/papyrus_consensus/src/manager_test.rs +++ b/crates/sequencing/papyrus_consensus/src/manager_test.rs @@ -74,7 +74,7 @@ mock! { fin_receiver: oneshot::Receiver, ) -> Result<(), ConsensusError>; - async fn notify_decision( + async fn decision_reached( &mut self, block: TestBlock, precommits: Vec, @@ -184,7 +184,7 @@ async fn run_consensus_sync() { context.expect_validators().returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID]); context.expect_proposer().returning(move |_, _| *PROPOSER_ID); context.expect_broadcast().returning(move |_| Ok(())); - context.expect_notify_decision().return_once(move |block, votes| { + context.expect_decision_reached().return_once(move |block, votes| { assert_eq!(block.id(), BlockHash(Felt::TWO)); assert_eq!(votes[0].height, 2); decision_tx.send(()).unwrap(); @@ -247,7 +247,7 @@ async fn run_consensus_sync_cancellation_safety() { Ok(()) }); context.expect_broadcast().returning(move |_| Ok(())); - context.expect_notify_decision().return_once(|block, votes| { + context.expect_decision_reached().return_once(|block, votes| { assert_eq!(block.id(), BlockHash(Felt::ONE)); assert_eq!(votes[0].height, 1); decision_tx.send(()).unwrap(); diff --git a/crates/sequencing/papyrus_consensus/src/papyrus_consensus_context.rs b/crates/sequencing/papyrus_consensus/src/papyrus_consensus_context.rs index 33bd87b3ff..87d2e09253 100644 --- a/crates/sequencing/papyrus_consensus/src/papyrus_consensus_context.rs +++ b/crates/sequencing/papyrus_consensus/src/papyrus_consensus_context.rs @@ -236,7 +236,7 @@ impl ConsensusContext for PapyrusConsensusContext { Ok(()) } - async fn notify_decision( + async fn decision_reached( &mut self, block: Self::Block, precommits: Vec, diff --git a/crates/sequencing/papyrus_consensus/src/papyrus_consensus_context_test.rs b/crates/sequencing/papyrus_consensus/src/papyrus_consensus_context_test.rs index ef7cd27999..94050f1967 100644 --- a/crates/sequencing/papyrus_consensus/src/papyrus_consensus_context_test.rs +++ b/crates/sequencing/papyrus_consensus/src/papyrus_consensus_context_test.rs @@ -104,7 +104,7 @@ async fn decision() { let (_, mut papyrus_context, _, mut sync_network) = test_setup(); let block = PapyrusConsensusBlock::default(); let precommit = Vote::default(); - papyrus_context.notify_decision(block, vec![precommit.clone()]).await.unwrap(); + papyrus_context.decision_reached(block, vec![precommit.clone()]).await.unwrap(); assert_eq!(sync_network.messages_to_broadcast_receiver.next().await.unwrap(), precommit); } diff --git a/crates/sequencing/papyrus_consensus/src/test_utils.rs b/crates/sequencing/papyrus_consensus/src/test_utils.rs index 5b1d3ce935..465014882b 100644 --- a/crates/sequencing/papyrus_consensus/src/test_utils.rs +++ b/crates/sequencing/papyrus_consensus/src/test_utils.rs @@ -58,7 +58,7 @@ mock! { fin_receiver: oneshot::Receiver, ) -> Result<(), ConsensusError>; - async fn notify_decision( + async fn decision_reached( &mut self, block: TestBlock, precommits: Vec, diff --git a/crates/sequencing/papyrus_consensus/src/types.rs b/crates/sequencing/papyrus_consensus/src/types.rs index bd81fe8c99..621563417e 100644 --- a/crates/sequencing/papyrus_consensus/src/types.rs +++ b/crates/sequencing/papyrus_consensus/src/types.rs @@ -140,7 +140,7 @@ pub trait ConsensusContext { /// - `block` identifies the decision. /// - `precommits` - All precommits must be for the same `(block.id(), height, round)` and form /// a quorum (>2/3 of the voting power) for this height. - async fn notify_decision( + async fn decision_reached( &mut self, block: Self::Block, precommits: Vec, From 19106af5366328732e35ba9f7fb1a4e77cb90f75 Mon Sep 17 00:00:00 2001 From: mohammad-starkware <130282237+MohammadNassar1@users.noreply.github.com> Date: Sun, 18 Aug 2024 13:35:14 +0300 Subject: [PATCH 07/16] refactor(mempool): transaction pool stores transaction (prev internal tx), API did not change (#341) --- crates/mempool/src/mempool.rs | 14 ++++++- crates/mempool/src/mempool_test.rs | 2 +- crates/mempool/src/transaction_pool.rs | 32 +++++++--------- crates/mempool_types/src/mempool_types.rs | 45 ++++++++++++++++++++++- 4 files changed, 70 insertions(+), 23 deletions(-) diff --git a/crates/mempool/src/mempool.rs b/crates/mempool/src/mempool.rs index 1178974d0f..06c399dfea 100644 --- a/crates/mempool/src/mempool.rs +++ b/crates/mempool/src/mempool.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use starknet_api::core::{ContractAddress, Nonce}; +use starknet_api::executable_transaction::Transaction; use starknet_api::transaction::{Tip, TransactionHash}; use starknet_mempool_types::errors::MempoolError; use starknet_mempool_types::mempool_types::{ @@ -68,7 +69,7 @@ impl Mempool { let mut eligible_txs: Vec = Vec::with_capacity(n_txs); for tx_ref in &eligible_tx_references { let tx = self.tx_pool.remove(tx_ref.tx_hash)?; - eligible_txs.push(tx); + eligible_txs.push((&tx).into()); } // Update the mempool state with the given transactions' nonces. @@ -137,7 +138,7 @@ impl Mempool { // Remove transactions with lower nonce than the account nonce. self.tx_pool.remove_up_to_nonce(sender_address, nonce); - self.tx_pool.insert(tx)?; + self.tx_pool.insert((&tx).into())?; // Maybe close nonce gap. if self.tx_queue.get_nonce(sender_address).is_none() { @@ -232,4 +233,13 @@ impl TransactionReference { tip: tx.tip, } } + + pub fn new(tx: &Transaction) -> Self { + TransactionReference { + sender_address: tx.contract_address(), + nonce: tx.nonce(), + tx_hash: tx.tx_hash(), + tip: tx.tip().expect("Expected a valid tip value, but received None."), + } + } } diff --git a/crates/mempool/src/mempool_test.rs b/crates/mempool/src/mempool_test.rs index 3683319aaf..168643f627 100644 --- a/crates/mempool/src/mempool_test.rs +++ b/crates/mempool/src/mempool_test.rs @@ -114,7 +114,7 @@ impl FromIterator for TransactionPool { fn from_iter>(txs: T) -> Self { let mut pool = Self::default(); for tx in txs { - pool.insert(tx).unwrap(); + pool.insert((&tx).into()).unwrap(); } pool } diff --git a/crates/mempool/src/transaction_pool.rs b/crates/mempool/src/transaction_pool.rs index 77833a42de..6e65e0ac51 100644 --- a/crates/mempool/src/transaction_pool.rs +++ b/crates/mempool/src/transaction_pool.rs @@ -1,18 +1,14 @@ use std::collections::{hash_map, BTreeMap, HashMap}; use starknet_api::core::{ContractAddress, Nonce}; +use starknet_api::executable_transaction::Transaction; use starknet_api::transaction::TransactionHash; use starknet_mempool_types::errors::MempoolError; -use starknet_mempool_types::mempool_types::{ - Account, - AccountState, - MempoolResult, - ThinTransaction, -}; +use starknet_mempool_types::mempool_types::{Account, AccountState, MempoolResult}; use crate::mempool::TransactionReference; -type HashToTransaction = HashMap; +type HashToTransaction = HashMap; /// Contains all transactions currently held in the mempool. /// Invariant: both data structures are consistent regarding the existence of transactions: @@ -27,8 +23,8 @@ pub struct TransactionPool { } impl TransactionPool { - pub fn insert(&mut self, tx: ThinTransaction) -> MempoolResult<()> { - let tx_reference = TransactionReference::new_from_thin_tx(&tx); + pub fn insert(&mut self, tx: Transaction) -> MempoolResult<()> { + let tx_reference = TransactionReference::new(&tx); let tx_hash = tx_reference.tx_hash; // Insert to pool. @@ -50,20 +46,18 @@ impl TransactionPool { Ok(()) } - pub fn remove(&mut self, tx_hash: TransactionHash) -> MempoolResult { + pub fn remove(&mut self, tx_hash: TransactionHash) -> MempoolResult { // Remove from pool. let tx = self.tx_pool.remove(&tx_hash).ok_or(MempoolError::TransactionNotFound { tx_hash })?; // Remove from account mapping. - self.txs_by_account.remove(TransactionReference::new_from_thin_tx(&tx)).unwrap_or_else( - || { - panic!( - "Transaction pool consistency error: transaction with hash {tx_hash} appears \ - in main mapping, but does not appear in the account mapping" - ) - }, - ); + self.txs_by_account.remove(TransactionReference::new(&tx)).unwrap_or_else(|| { + panic!( + "Transaction pool consistency error: transaction with hash {tx_hash} appears in \ + main mapping, but does not appear in the account mapping" + ) + }); Ok(tx) } @@ -81,7 +75,7 @@ impl TransactionPool { } } - pub fn _get_by_tx_hash(&self, tx_hash: TransactionHash) -> MempoolResult<&ThinTransaction> { + pub fn _get_by_tx_hash(&self, tx_hash: TransactionHash) -> MempoolResult<&Transaction> { self.tx_pool.get(&tx_hash).ok_or(MempoolError::TransactionNotFound { tx_hash }) } diff --git a/crates/mempool_types/src/mempool_types.rs b/crates/mempool_types/src/mempool_types.rs index 267eff773a..a2dbdfaf53 100644 --- a/crates/mempool_types/src/mempool_types.rs +++ b/crates/mempool_types/src/mempool_types.rs @@ -1,6 +1,16 @@ use serde::{Deserialize, Serialize}; use starknet_api::core::{ContractAddress, Nonce}; -use starknet_api::transaction::{Tip, TransactionHash}; +use starknet_api::data_availability::DataAvailabilityMode; +use starknet_api::executable_transaction::{InvokeTransaction, Transaction}; +use starknet_api::transaction::{ + AccountDeploymentData, + Calldata, + PaymasterData, + ResourceBoundsMapping, + Tip, + TransactionHash, + TransactionSignature, +}; use crate::errors::MempoolError; @@ -32,3 +42,36 @@ pub struct MempoolInput { } pub type MempoolResult = Result; + +impl From<&Transaction> for ThinTransaction { + fn from(tx: &Transaction) -> Self { + ThinTransaction { + sender_address: tx.contract_address(), + tx_hash: tx.tx_hash(), + tip: tx.tip().expect("Expected a valid tip value, but received None."), + nonce: tx.nonce(), + } + } +} + +impl From<&ThinTransaction> for Transaction { + fn from(tx: &ThinTransaction) -> Self { + Transaction::Invoke(InvokeTransaction { + tx: starknet_api::transaction::InvokeTransaction::V3( + starknet_api::transaction::InvokeTransactionV3 { + sender_address: tx.sender_address, + tip: tx.tip, + nonce: tx.nonce, + resource_bounds: ResourceBoundsMapping::default(), + signature: TransactionSignature::default(), + calldata: Calldata::default(), + nonce_data_availability_mode: DataAvailabilityMode::L1, + fee_data_availability_mode: DataAvailabilityMode::L1, + paymaster_data: PaymasterData::default(), + account_deployment_data: AccountDeploymentData::default(), + }, + ), + tx_hash: tx.tx_hash, + }) + } +} From 240c1f5c9cee95beed977a1ebeecb282be08ac5e Mon Sep 17 00:00:00 2001 From: Lev Roitman Date: Thu, 15 Aug 2024 16:52:08 +0300 Subject: [PATCH 08/16] feat: adding consensus manager to ComponentExecutionConfig and MempoolNodeConfig commit-id:3811a2b1 --- Cargo.lock | 1 + config/mempool/default_config.json | 50 +++++++++++++++++++ crates/mempool_node/Cargo.toml | 1 + crates/mempool_node/src/config/config_test.rs | 16 ++++-- crates/mempool_node/src/config/mod.rs | 27 +++++++++- 5 files changed, 91 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 42fbe5ae72..eda062ceca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9011,6 +9011,7 @@ dependencies = [ "serde_json", "starknet_batcher", "starknet_batcher_types", + "starknet_consensus_manager", "starknet_gateway", "starknet_mempool", "starknet_mempool_infra", diff --git a/config/mempool/default_config.json b/config/mempool/default_config.json index 2d8f6831c8..dfa3b6f36d 100644 --- a/config/mempool/default_config.json +++ b/config/mempool/default_config.json @@ -54,6 +54,51 @@ "privacy": "Public", "value": 3 }, + "components.consensus_manager.component_type": { + "description": "The component type.", + "privacy": "Public", + "value": "AsynchronousComponent" + }, + "components.consensus_manager.execute": { + "description": "The component execution flag.", + "privacy": "Public", + "value": true + }, + "components.consensus_manager.local_config.#is_none": { + "description": "Flag for an optional field.", + "privacy": "TemporaryValue", + "value": false + }, + "components.consensus_manager.local_config.channel_buffer_size": { + "description": "The communication channel buffer size.", + "privacy": "Public", + "value": 32 + }, + "components.consensus_manager.location": { + "description": "The component location.", + "privacy": "Public", + "value": "Local" + }, + "components.consensus_manager.remote_config.#is_none": { + "description": "Flag for an optional field.", + "privacy": "TemporaryValue", + "value": true + }, + "components.consensus_manager.remote_config.ip": { + "description": "The remote component server ip.", + "privacy": "Public", + "value": "0.0.0.0" + }, + "components.consensus_manager.remote_config.port": { + "description": "The remote component server port.", + "privacy": "Public", + "value": 8080 + }, + "components.consensus_manager.remote_config.retries": { + "description": "The max number of retries for sending a message.", + "privacy": "Public", + "value": 3 + }, "components.gateway.component_type": { "description": "The component type.", "privacy": "Public", @@ -144,6 +189,11 @@ "privacy": "Public", "value": 3 }, + "consensus_manager_config.consensus_config_param_1": { + "description": "The first consensus manager configuration parameter", + "privacy": "Public", + "value": 1 + }, "gateway_config.network_config.ip": { "description": "The gateway server ip.", "privacy": "Public", diff --git a/crates/mempool_node/Cargo.toml b/crates/mempool_node/Cargo.toml index ab746499ec..9a7f959429 100644 --- a/crates/mempool_node/Cargo.toml +++ b/crates/mempool_node/Cargo.toml @@ -18,6 +18,7 @@ rstest.workspace = true serde.workspace = true starknet_batcher.workspace = true starknet_batcher_types.workspace = true +starknet_consensus_manager.workspace = true starknet_gateway.workspace = true starknet_mempool.workspace = true starknet_mempool_infra.workspace = true diff --git a/crates/mempool_node/src/config/config_test.rs b/crates/mempool_node/src/config/config_test.rs index a520bc5445..70919b0f26 100644 --- a/crates/mempool_node/src/config/config_test.rs +++ b/crates/mempool_node/src/config/config_test.rs @@ -124,6 +124,10 @@ fn test_invalid_components_config() { // Initialize an invalid config and check that the validator finds an error. let component_config = ComponentConfig { batcher: ComponentExecutionConfig { execute: false, ..ComponentExecutionConfig::default() }, + consensus_manager: ComponentExecutionConfig { + execute: false, + ..ComponentExecutionConfig::default() + }, gateway: ComponentExecutionConfig { execute: false, ..ComponentExecutionConfig::default() }, mempool: ComponentExecutionConfig { execute: false, ..ComponentExecutionConfig::default() }, }; @@ -138,11 +142,13 @@ fn test_invalid_components_config() { /// Test the validation of the struct ComponentConfig. /// The validation validates at least one of the components is set with execute: true. #[rstest] -#[case(true, false, false)] -#[case(false, true, false)] -#[case(false, false, true)] +#[case(true, false, false, false)] +#[case(false, true, false, false)] +#[case(false, false, true, false)] +#[case(false, false, false, true)] fn test_valid_components_config( #[case] batcher_component_execute: bool, + #[case] consensus_manager_component_execute: bool, #[case] gateway_component_execute: bool, #[case] mempool_component_execute: bool, ) { @@ -152,6 +158,10 @@ fn test_valid_components_config( execute: batcher_component_execute, ..ComponentExecutionConfig::default() }, + consensus_manager: ComponentExecutionConfig { + execute: consensus_manager_component_execute, + ..ComponentExecutionConfig::default() + }, gateway: ComponentExecutionConfig { execute: gateway_component_execute, ..ComponentExecutionConfig::default() diff --git a/crates/mempool_node/src/config/mod.rs b/crates/mempool_node/src/config/mod.rs index e045ac7d34..9f7ff51809 100644 --- a/crates/mempool_node/src/config/mod.rs +++ b/crates/mempool_node/src/config/mod.rs @@ -16,6 +16,7 @@ use papyrus_config::loading::load_and_process_config; use papyrus_config::{ConfigError, ParamPath, ParamPrivacyInput, SerializedParam}; use serde::{Deserialize, Serialize}; use starknet_batcher::config::BatcherConfig; +use starknet_consensus_manager::config::ConsensusManagerConfig; use starknet_gateway::config::{GatewayConfig, RpcStateReaderConfig}; use starknet_mempool_infra::component_definitions::{ LocalComponentCommunicationConfig, @@ -137,6 +138,16 @@ impl ComponentExecutionConfig { remote_config: None, } } + + pub fn consensus_manager_default_config() -> Self { + Self { + execute: true, + location: LocationType::Local, + component_type: ComponentType::AsynchronousComponent, + local_config: Some(LocalComponentCommunicationConfig::default()), + remote_config: None, + } + } } pub fn validate_single_component_config( @@ -169,6 +180,8 @@ pub struct ComponentConfig { #[validate] pub batcher: ComponentExecutionConfig, #[validate] + pub consensus_manager: ComponentExecutionConfig, + #[validate] pub gateway: ComponentExecutionConfig, #[validate] pub mempool: ComponentExecutionConfig, @@ -178,6 +191,7 @@ impl Default for ComponentConfig { fn default() -> Self { Self { batcher: ComponentExecutionConfig::batcher_default_config(), + consensus_manager: ComponentExecutionConfig::consensus_manager_default_config(), gateway: ComponentExecutionConfig::gateway_default_config(), mempool: ComponentExecutionConfig::mempool_default_config(), } @@ -189,6 +203,7 @@ impl SerializeConfig for ComponentConfig { #[allow(unused_mut)] let mut sub_configs = vec![ append_sub_config_name(self.batcher.dump(), "batcher"), + append_sub_config_name(self.consensus_manager.dump(), "consensus_manager"), append_sub_config_name(self.gateway.dump(), "gateway"), append_sub_config_name(self.mempool.dump(), "mempool"), ]; @@ -198,7 +213,11 @@ impl SerializeConfig for ComponentConfig { } pub fn validate_components_config(components: &ComponentConfig) -> Result<(), ValidationError> { - if components.gateway.execute || components.mempool.execute || components.batcher.execute { + if components.gateway.execute + || components.mempool.execute + || components.batcher.execute + || components.consensus_manager.execute + { return Ok(()); } @@ -215,6 +234,8 @@ pub struct MempoolNodeConfig { #[validate] pub batcher_config: BatcherConfig, #[validate] + pub consensus_manager_config: ConsensusManagerConfig, + #[validate] pub gateway_config: GatewayConfig, #[validate] pub rpc_state_reader_config: RpcStateReaderConfig, @@ -228,6 +249,10 @@ impl SerializeConfig for MempoolNodeConfig { let mut sub_configs = vec![ append_sub_config_name(self.components.dump(), "components"), append_sub_config_name(self.batcher_config.dump(), "batcher_config"), + append_sub_config_name( + self.consensus_manager_config.dump(), + "consensus_manager_config", + ), append_sub_config_name(self.gateway_config.dump(), "gateway_config"), append_sub_config_name(self.rpc_state_reader_config.dump(), "rpc_state_reader_config"), append_sub_config_name(self.compiler_config.dump(), "compiler_config"), From bc4fc7dc17a472027a08bfb02a6be4f08f324076 Mon Sep 17 00:00:00 2001 From: matan-starkware <97523054+matan-starkware@users.noreply.github.com> Date: Sun, 18 Aug 2024 13:55:02 +0300 Subject: [PATCH 09/16] feat(consensus): add metric for sync in consensus and update the simulation script (#490) --- crates/papyrus_common/src/metrics.rs | 3 +++ .../papyrus_consensus/run_consensus.py | 21 +++++++++++-------- .../papyrus_consensus/src/manager.rs | 3 ++- .../src/simulation_network_receiver.rs | 4 ++++ .../src/single_height_consensus.rs | 4 ++-- 5 files changed, 23 insertions(+), 12 deletions(-) diff --git a/crates/papyrus_common/src/metrics.rs b/crates/papyrus_common/src/metrics.rs index b06f42fec8..cdb6e24a66 100644 --- a/crates/papyrus_common/src/metrics.rs +++ b/crates/papyrus_common/src/metrics.rs @@ -40,3 +40,6 @@ pub static COLLECT_PROFILING_METRICS: OnceLock = OnceLock::new(); /// The height consensus is currently working on. pub const PAPYRUS_CONSENSUS_HEIGHT: &str = "papyrus_consensus_height"; + +/// The number of times consensus has progressed due to the sync protocol. +pub const PAPYRUS_CONSENSUS_SYNC_COUNT: &str = "papyrus_consensus_sync_count"; diff --git a/crates/sequencing/papyrus_consensus/run_consensus.py b/crates/sequencing/papyrus_consensus/run_consensus.py index 1b19ab3cde..fbeb3a0ed5 100644 --- a/crates/sequencing/papyrus_consensus/run_consensus.py +++ b/crates/sequencing/papyrus_consensus/run_consensus.py @@ -22,6 +22,7 @@ def __init__(self, validator_id, monitoring_gateway_server_port, cmd): self.cmd = cmd self.process = None self.height_and_timestamp = (None, None) # (height, timestamp) + self.sync_count = None def start(self): self.process = subprocess.Popen(self.cmd, shell=True, preexec_fn=os.setsid) @@ -31,15 +32,17 @@ def stop(self): os.killpg(os.getpgid(self.process.pid), signal.SIGINT) self.process.wait() - def get_height(self): + def get_metric(self, metric: str): port = self.monitoring_gateway_server_port - command = f"curl -s -X GET http://localhost:{port}/monitoring/metrics | grep -oP 'papyrus_consensus_height \\K\\d+'" + command = f"curl -s -X GET http://localhost:{port}/monitoring/metrics | grep -oP '{metric} \\K\\d+'" result = subprocess.run(command, shell=True, capture_output=True, text=True) - # returns the latest decided height, or None if consensus has not yet started. return int(result.stdout) if result.stdout else None - def check_height(self): - height = self.get_height() + # Check the node's metrics and return the height and timestamp. + def check_node(self): + self.sync_count = self.get_metric("papyrus_consensus_sync_count") + + height = self.get_metric("papyrus_consensus_height") if self.height_and_timestamp[0] != height: if self.height_and_timestamp[0] is not None and height is not None: assert height > self.height_and_timestamp[0], "Height should be increasing." @@ -89,8 +92,8 @@ def monitor_simulation(nodes, start_time, duration, stagnation_timeout): return True stagnated_nodes = [] for node in nodes: - (height, last_update) = node.check_height() - print(f"Node: {node.validator_id}, height: {height}") + (height, last_update) = node.check_node() + print(f"Node: {node.validator_id}, height: {height}, sync_count: {node.sync_count}") if height is not None and (curr_time - last_update) > stagnation_timeout: stagnated_nodes.append(node.validator_id) if stagnated_nodes: @@ -107,7 +110,8 @@ def run_simulation(nodes, duration, stagnation_timeout): try: while True: time.sleep(MONITORING_PERIOD) - print(f"\nTime elapsed: {time.time() - start_time}s") + elapsed = round(time.time() - start_time) + print(f"\nTime elapsed: {elapsed}s") should_exit = monitor_simulation(nodes, start_time, duration, stagnation_timeout) if should_exit: break @@ -147,7 +151,6 @@ def build_node(data_dir, logs_dir, i, papryus_args): f"--network.secret_key {SECRET_KEY} " + f"2>&1 | sed -r 's/\\x1B\\[[0-9;]*[mK]//g' > {logs_dir}/validator{i}.txt" ) - else: cmd += ( f"--network.bootstrap_peer_multiaddr.#is_none false " diff --git a/crates/sequencing/papyrus_consensus/src/manager.rs b/crates/sequencing/papyrus_consensus/src/manager.rs index 3e24434eb7..a2c12ef79c 100644 --- a/crates/sequencing/papyrus_consensus/src/manager.rs +++ b/crates/sequencing/papyrus_consensus/src/manager.rs @@ -9,7 +9,7 @@ use std::time::Duration; use futures::channel::{mpsc, oneshot}; use futures::{Stream, StreamExt}; -use papyrus_common::metrics::PAPYRUS_CONSENSUS_HEIGHT; +use papyrus_common::metrics::{PAPYRUS_CONSENSUS_HEIGHT, PAPYRUS_CONSENSUS_SYNC_COUNT}; use papyrus_network::network_manager::ReportSender; use papyrus_protobuf::consensus::{ConsensusMessage, Proposal}; use papyrus_protobuf::converters::ProtobufConversionError; @@ -69,6 +69,7 @@ where current_height = current_height.unchecked_next(); }, sync_height = sync_height(current_height, &mut sync_receiver) => { + metrics::increment_counter!(PAPYRUS_CONSENSUS_SYNC_COUNT); current_height = sync_height?.unchecked_next(); } } diff --git a/crates/sequencing/papyrus_consensus/src/simulation_network_receiver.rs b/crates/sequencing/papyrus_consensus/src/simulation_network_receiver.rs index bf784103da..ecb4135359 100644 --- a/crates/sequencing/papyrus_consensus/src/simulation_network_receiver.rs +++ b/crates/sequencing/papyrus_consensus/src/simulation_network_receiver.rs @@ -13,6 +13,7 @@ use papyrus_network::network_manager::ReportSender; use papyrus_protobuf::consensus::ConsensusMessage; use papyrus_protobuf::converters::ProtobufConversionError; use starknet_api::block::BlockHash; +use tracing::{debug, instrument}; /// Receiver used to help run simulations of consensus. It has 2 goals in mind: /// 1. Simulate network failures. @@ -66,6 +67,7 @@ where /// /// Applies `drop_probability` followed by `invalid_probability`. So the probability of an /// invalid message is `(1- drop_probability) * invalid_probability`. + #[instrument(skip(self), level = "debug")] pub fn filter_msg(&mut self, mut msg: ConsensusMessage) -> Option { if !matches!(msg, ConsensusMessage::Proposal(_)) { // TODO(matan): Add support for dropping/invalidating votes. @@ -73,6 +75,7 @@ where } if self.should_drop_msg(&msg) { + debug!("Dropping message"); return None; } @@ -109,6 +112,7 @@ where } fn invalidate_msg(&mut self, msg: &mut ConsensusMessage) { + debug!("Invalidating message"); // TODO(matan): Allow for invalid votes based on signature/sender_id. if let ConsensusMessage::Proposal(ref mut proposal) = msg { proposal.block_hash = BlockHash(proposal.block_hash.0 + 1); diff --git a/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs b/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs index 8fba89dd93..fd74787f06 100644 --- a/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs +++ b/crates/sequencing/papyrus_consensus/src/single_height_consensus.rs @@ -77,8 +77,8 @@ impl SingleHeightConsensus { fin_receiver: oneshot::Receiver, ) -> Result>, ConsensusError> { debug!( - "Received proposal: proposal_height={}, proposer={:?}", - init.height.0, init.proposer + "Received proposal: height={}, round={}, proposer={:?}", + init.height.0, init.round, init.proposer ); let proposer_id = context.proposer(&self.validators, self.height); if init.height != self.height { From 769683b63d749fbac2aad39c63384a0d92236bf2 Mon Sep 17 00:00:00 2001 From: Uriel Korach Date: Sun, 18 Aug 2024 11:07:40 +0300 Subject: [PATCH 10/16] chore(mempool_infra): add documentation to remote component server --- .../remote_component_server.rs | 83 +++++++++++++++++++ 1 file changed, 83 insertions(+) diff --git a/crates/mempool_infra/src/component_server/remote_component_server.rs b/crates/mempool_infra/src/component_server/remote_component_server.rs index 1703ee3b1a..70dc099d6e 100644 --- a/crates/mempool_infra/src/component_server/remote_component_server.rs +++ b/crates/mempool_infra/src/component_server/remote_component_server.rs @@ -19,6 +19,89 @@ use crate::component_definitions::{ APPLICATION_OCTET_STREAM, }; +/// The `RemoteComponentServer` struct is a generic server that handles requests and responses for a +/// specified component. It receives requests, processes them using the provided component, and +/// sends back responses. The server needs to be started using the `start` function, which runs +/// indefinitely. +/// +/// # Type Parameters +/// +/// - `Component`: The type of the component that will handle the requests. This type must implement +/// the `ComponentRequestHandler` trait, which defines how the component processes requests and +/// generates responses. +/// - `Request`: The type of requests that the component will handle. This type must implement the +/// `serde::de::DeserializeOwned` (e.g. by using #[derive(Deserialize)]) trait. +/// - `Response`: The type of responses that the component will generate. This type must implement +/// the `Serialize` trait. +/// +/// # Fields +/// +/// - `component`: The component responsible for handling the requests and generating responses. +/// - `socket`: A socket address for the server to listen on. +/// +/// # Example +/// ```rust +/// // Example usage of the RemoteComponentServer +/// use async_trait::async_trait; +/// use serde::{Deserialize, Serialize}; +/// use starknet_mempool_infra::component_runner::{ComponentStartError, ComponentStarter}; +/// use tokio::task; +/// +/// use crate::starknet_mempool_infra::component_definitions::ComponentRequestHandler; +/// use crate::starknet_mempool_infra::component_server::{ +/// ComponentServerStarter, +/// RemoteComponentServer, +/// }; +/// +/// // Define your component +/// struct MyComponent {} +/// +/// #[async_trait] +/// impl ComponentStarter for MyComponent { +/// async fn start(&mut self) -> Result<(), ComponentStartError> { +/// Ok(()) +/// } +/// } +/// +/// // Define your request and response types +/// #[derive(Deserialize)] +/// struct MyRequest { +/// pub content: String, +/// } +/// +/// #[derive(Serialize)] +/// struct MyResponse { +/// pub content: String, +/// } +/// +/// // Define your request processing logic +/// #[async_trait] +/// impl ComponentRequestHandler for MyComponent { +/// async fn handle_request(&mut self, request: MyRequest) -> MyResponse { +/// MyResponse { content: request.content.clone() + " processed" } +/// } +/// } +/// +/// #[tokio::main] +/// async fn main() { +/// // Instantiate the component. +/// let component = MyComponent {}; +/// +/// // Set the ip address and port of the server's socket. +/// let ip_address = std::net::IpAddr::V6(std::net::Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)); +/// let port: u16 = 8080; +/// +/// // Instantiate the server. +/// let mut server = RemoteComponentServer::::new( +/// component, ip_address, port, +/// ); +/// +/// // Start the server in a new task. +/// task::spawn(async move { +/// server.start().await; +/// }); +/// } +/// ``` pub struct RemoteComponentServer where Component: ComponentRequestHandler + Send + 'static, From cd355d7ae12f939545e6c7e5934b12b3a2222a82 Mon Sep 17 00:00:00 2001 From: Uriel Korach Date: Thu, 15 Aug 2024 11:59:04 +0300 Subject: [PATCH 11/16] chore(mempool_infra): add documentation to remote component client --- .../remote_component_client.rs | 53 +++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/crates/mempool_infra/src/component_client/remote_component_client.rs b/crates/mempool_infra/src/component_client/remote_component_client.rs index 625621a11b..c7cc5f8121 100644 --- a/crates/mempool_infra/src/component_client/remote_component_client.rs +++ b/crates/mempool_infra/src/component_client/remote_component_client.rs @@ -12,6 +12,59 @@ use serde::Serialize; use super::definitions::{ClientError, ClientResult}; use crate::component_definitions::APPLICATION_OCTET_STREAM; +/// The `RemoteComponentClient` struct is a generic client for sending component requests and +/// receiving responses asynchronously through HTTP connection. +/// +/// # Type Parameters +/// - `Request`: The type of the request. This type must implement the `serde::Serialize` trait. +/// - `Response`: The type of the response. This type must implement the +/// `serde::de::DeserializeOwned` (e.g. by using #[derive(Deserialize)]) trait. +/// +/// # Fields +/// - `uri`: URI address of the server. +/// - `client`: The inner HTTP client that initiates the connection to the server and manages it. +/// - `max_retries`: Configurable number of extra attempts to send a request to server in case of a +/// failure. +/// +/// # Example +/// ```rust +/// // Example usage of the RemoteComponentClient +/// +/// use serde::{Deserialize, Serialize}; +/// +/// use crate::starknet_mempool_infra::component_client::RemoteComponentClient; +/// +/// // Define your request and response types +/// #[derive(Serialize)] +/// struct MyRequest { +/// pub content: String, +/// } +/// +/// #[derive(Deserialize)] +/// struct MyResponse { +/// content: String, +/// } +/// +/// #[tokio::main] +/// async fn main() { +/// // Create a channel for sending requests and receiving responses +/// // Instantiate the client. +/// let ip_address = std::net::IpAddr::V6(std::net::Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)); +/// let port: u16 = 8080; +/// let client = RemoteComponentClient::::new(ip_address, port, 2); +/// +/// // Instantiate a request. +/// let request = MyRequest { content: "Hello, world!".to_string() }; +/// +/// // Send the request; typically, the client should await for a response. +/// client.send(request); +/// } +/// ``` +/// +/// # Notes +/// - The `RemoteComponentClient` struct is designed to work in an asynchronous environment, +/// utilizing Tokio's async runtime and hyper framwork to send HTTP requests and receive HTTP +/// responses. pub struct RemoteComponentClient where Request: Serialize, From 3766781861a81d3dd8087300000c264d268c2199 Mon Sep 17 00:00:00 2001 From: Arnon Hod Date: Sun, 18 Aug 2024 14:38:40 +0300 Subject: [PATCH 12/16] chore: fix docstring of executable contract class structs (#491) --- .../blockifier/src/execution/contract_class.rs | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/crates/blockifier/src/execution/contract_class.rs b/crates/blockifier/src/execution/contract_class.rs index 1e257619b7..f9e3a349c5 100644 --- a/crates/blockifier/src/execution/contract_class.rs +++ b/crates/blockifier/src/execution/contract_class.rs @@ -43,13 +43,9 @@ use crate::transaction::errors::TransactionExecutionError; #[path = "contract_class_test.rs"] pub mod test; -/// Represents a runnable Starknet contract class (meaning, the program is runnable by the VM). -/// We wrap the actual class in an Arc to avoid cloning the program when cloning the class. -// Note: when deserializing from a SN API class JSON string, the ABI field is ignored -// by serde, since it is not required for execution. - pub type ContractClassResult = Result; +/// Represents a runnable Starknet contract class (meaning, the program is runnable by the VM). #[derive(Clone, Debug, Eq, PartialEq, derive_more::From)] pub enum ContractClass { V0(ContractClassV0), @@ -103,6 +99,12 @@ impl ContractClass { } // V0. + +/// Represents a runnable Cario 0 Starknet contract class (meaning, the program is runnable by the +/// VM). We wrap the actual class in an Arc to avoid cloning the program when cloning the +/// class. +// Note: when deserializing from a SN API class JSON string, the ABI field is ignored +// by serde, since it is not required for execution. #[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq)] pub struct ContractClassV0(pub Arc); impl Deref for ContractClassV0 { @@ -170,6 +172,10 @@ impl TryFrom for ContractClassV0 { } // V1. + +/// Represents a runnable Cario (Cairo 1) Starknet contract class (meaning, the program is runnable +/// by the VM). We wrap the actual class in an Arc to avoid cloning the program when cloning the +/// class. #[derive(Clone, Debug, Eq, PartialEq)] pub struct ContractClassV1(pub Arc); impl Deref for ContractClassV1 { From 07e89e46deb36ead2da540fe18dccd0aeb591c99 Mon Sep 17 00:00:00 2001 From: DvirYo-starkware <115620476+DvirYo-starkware@users.noreply.github.com> Date: Sun, 18 Aug 2024 16:47:52 +0300 Subject: [PATCH 13/16] feat(storage): more logs and metrics for storage writes (#487) --- crates/papyrus_storage/src/db/mod.rs | 2 ++ crates/papyrus_storage/src/state/mod.rs | 5 ++++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/crates/papyrus_storage/src/db/mod.rs b/crates/papyrus_storage/src/db/mod.rs index 725f3f7517..2573ebb7b0 100644 --- a/crates/papyrus_storage/src/db/mod.rs +++ b/crates/papyrus_storage/src/db/mod.rs @@ -33,6 +33,7 @@ use libmdbx::{DatabaseFlags, Geometry, PageSize, WriteMap}; use papyrus_config::dumping::{ser_param, SerializeConfig}; use papyrus_config::validators::{validate_ascii, validate_path_exists}; use papyrus_config::{ParamPath, ParamPrivacyInput, SerializedParam}; +use papyrus_proc_macros::latency_histogram; use serde::{Deserialize, Serialize}; use starknet_api::core::ChainId; use validator::Validate; @@ -261,6 +262,7 @@ impl DbWriter { type DbWriteTransaction<'env> = DbTransaction<'env, RW>; impl<'a> DbWriteTransaction<'a> { + #[latency_histogram("storage_commit_inner_db_latency_seconds", false)] pub(crate) fn commit(self) -> DbResult<()> { self.txn.commit()?; Ok(()) diff --git a/crates/papyrus_storage/src/state/mod.rs b/crates/papyrus_storage/src/state/mod.rs index ddbbcc1c4d..5a872c29ec 100644 --- a/crates/papyrus_storage/src/state/mod.rs +++ b/crates/papyrus_storage/src/state/mod.rs @@ -485,6 +485,7 @@ impl<'env> StateStorageWriter for StorageTxn<'env, RW> { Ok(self) } + #[latency_histogram("storage_revert_state_diff_latency_seconds", false)] fn revert_state_diff( self, block_number: BlockNumber, @@ -658,7 +659,7 @@ fn write_deployed_contracts<'env>( Ok(()) } -#[latency_histogram("storage_write_nonce_latency_seconds", true)] +#[latency_histogram("storage_write_nonce_latency_seconds", false)] fn write_nonces<'env>( nonces: &IndexMap, txn: &DbTransaction<'env, RW>, @@ -799,6 +800,7 @@ fn delete_deployed_contracts<'env>( Ok(()) } +#[latency_histogram("storage_delete_storage_diffs_latency_seconds", false)] fn delete_storage_diffs<'env>( txn: &'env DbTransaction<'env, RW>, block_number: BlockNumber, @@ -813,6 +815,7 @@ fn delete_storage_diffs<'env>( Ok(()) } +#[latency_histogram("storage_delete_nonces_latency_seconds", false)] fn delete_nonces<'env>( txn: &'env DbTransaction<'env, RW>, block_number: BlockNumber, From 0b880139e1be94685e64657a571e6ab94f4e50bb Mon Sep 17 00:00:00 2001 From: Ayelet Zilber <138376632+ayeletstarkware@users.noreply.github.com> Date: Sun, 18 Aug 2024 17:04:04 +0300 Subject: [PATCH 14/16] refactor(mempool): delete new function since it's not in use (#475) --- crates/mempool/src/mempool.rs | 9 --------- crates/mempool/src/mempool_test.rs | 13 ------------- 2 files changed, 22 deletions(-) diff --git a/crates/mempool/src/mempool.rs b/crates/mempool/src/mempool.rs index 06c399dfea..907bfa869b 100644 --- a/crates/mempool/src/mempool.rs +++ b/crates/mempool/src/mempool.rs @@ -31,15 +31,6 @@ pub struct Mempool { } impl Mempool { - pub fn new(inputs: impl IntoIterator) -> MempoolResult { - let mut mempool = Mempool::empty(); - - for input in inputs { - mempool.insert_tx(input)?; - } - Ok(mempool) - } - pub fn empty() -> Self { Mempool::default() } diff --git a/crates/mempool/src/mempool_test.rs b/crates/mempool/src/mempool_test.rs index 168643f627..33cb4ae8eb 100644 --- a/crates/mempool/src/mempool_test.rs +++ b/crates/mempool/src/mempool_test.rs @@ -184,19 +184,6 @@ fn mempool() -> Mempool { // Tests. -// new method tests. - -#[test] -fn test_new_with_duplicate_tx() { - let input = add_tx_input!(tip: 0, tx_hash: 1); - let same_input = input.clone(); - - assert!(matches!( - Mempool::new([input, same_input]), - Err(MempoolError::DuplicateTransaction { .. }) - )); -} - // get_txs tests. #[rstest] From 7e04ae857ce720d6265451f201d0f91a22e4fa68 Mon Sep 17 00:00:00 2001 From: Shahak Shama Date: Thu, 15 Aug 2024 09:58:57 +0300 Subject: [PATCH 15/16] feat(network): hardcode the external address of the node --- config/papyrus/default_config.json | 10 +++++++ .../papyrus_network/src/e2e_broadcast_test.rs | 2 +- crates/papyrus_network/src/lib.rs | 11 ++++++++ .../src/network_manager/mod.rs | 26 ++++++++++++++++--- .../src/network_manager/swarm_trait.rs | 3 ++- .../src/network_manager/test.rs | 8 +++--- ...fig__config_test__dump_default_config.snap | 10 +++++++ 7 files changed, 60 insertions(+), 10 deletions(-) diff --git a/config/papyrus/default_config.json b/config/papyrus/default_config.json index 81f897e70d..9a0f2ddcb0 100644 --- a/config/papyrus/default_config.json +++ b/config/papyrus/default_config.json @@ -179,6 +179,16 @@ "pointer_target": "chain_id", "privacy": "Public" }, + "network.hardcoded_external_multiaddr": { + "description": "The external address other peers see this node. If this is set, the node will not try to find out which addresses it has and will write this address as external instead", + "privacy": "Public", + "value": "" + }, + "network.hardcoded_external_multiaddr.#is_none": { + "description": "Flag for an optional field.", + "privacy": "TemporaryValue", + "value": true + }, "network.idle_connection_timeout": { "description": "Amount of time in seconds that a connection with no active sessions will stay alive.", "privacy": "Public", diff --git a/crates/papyrus_network/src/e2e_broadcast_test.rs b/crates/papyrus_network/src/e2e_broadcast_test.rs index 5895427e65..9eabf83d50 100644 --- a/crates/papyrus_network/src/e2e_broadcast_test.rs +++ b/crates/papyrus_network/src/e2e_broadcast_test.rs @@ -44,7 +44,7 @@ async fn create_swarm(bootstrap_peer_multiaddr: Option) -> Swarm, ) -> GenericNetworkManager> { - GenericNetworkManager::generic_new(swarm) + GenericNetworkManager::generic_new(swarm, None) } const BUFFER_SIZE: usize = 100; diff --git a/crates/papyrus_network/src/lib.rs b/crates/papyrus_network/src/lib.rs index 696118710e..0ce213ba9b 100644 --- a/crates/papyrus_network/src/lib.rs +++ b/crates/papyrus_network/src/lib.rs @@ -44,6 +44,7 @@ pub struct NetworkConfig { #[validate(custom = "validate_vec_u256")] #[serde(deserialize_with = "deserialize_optional_vec_u8")] pub(crate) secret_key: Option>, + pub hardcoded_external_multiaddr: Option, pub chain_id: ChainId, } @@ -96,6 +97,15 @@ impl SerializeConfig for NetworkConfig { will be used.", ParamPrivacyInput::Private, )]); + config.extend(ser_optional_param( + &self.bootstrap_peer_multiaddr, + Multiaddr::empty(), + "hardcoded_external_multiaddr", + "The external address other peers see this node. If this is set, the node will not \ + try to find out which addresses it has and will write this address as external \ + instead", + ParamPrivacyInput::Public, + )); config } } @@ -109,6 +119,7 @@ impl Default for NetworkConfig { idle_connection_timeout: Duration::from_secs(120), bootstrap_peer_multiaddr: None, secret_key: None, + hardcoded_external_multiaddr: None, chain_id: ChainId::Mainnet, } } diff --git a/crates/papyrus_network/src/network_manager/mod.rs b/crates/papyrus_network/src/network_manager/mod.rs index a71c8880c7..b6e62077e6 100644 --- a/crates/papyrus_network/src/network_manager/mod.rs +++ b/crates/papyrus_network/src/network_manager/mod.rs @@ -52,6 +52,7 @@ pub struct GenericNetworkManager { messages_to_broadcast_receivers: StreamHashMap>, broadcasted_messages_senders: HashMap>, reported_peer_receivers: FuturesUnordered>>, + hardcoded_external_multiaddr: Option, // Fields for metrics num_active_inbound_sessions: usize, num_active_outbound_sessions: usize, @@ -74,10 +75,18 @@ impl GenericNetworkManager { } } - pub(crate) fn generic_new(swarm: SwarmT) -> Self { + // TODO(shahak): remove the hardcoded_external_multiaddr arg once we manage external addresses + // in a behaviour. + pub(crate) fn generic_new( + mut swarm: SwarmT, + hardcoded_external_multiaddr: Option, + ) -> Self { gauge!(papyrus_metrics::PAPYRUS_NUM_CONNECTED_PEERS, 0f64); let reported_peer_receivers = FuturesUnordered::new(); reported_peer_receivers.push(futures::future::pending().boxed()); + if let Some(address) = hardcoded_external_multiaddr.clone() { + swarm.add_external_address(address); + } Self { swarm, inbound_protocol_to_buffer_size: HashMap::new(), @@ -89,6 +98,7 @@ impl GenericNetworkManager { messages_to_broadcast_receivers: StreamHashMap::new(HashMap::new()), broadcasted_messages_senders: HashMap::new(), reported_peer_receivers, + hardcoded_external_multiaddr, num_active_inbound_sessions: 0, num_active_outbound_sessions: 0, } @@ -258,13 +268,14 @@ impl GenericNetworkManager { } SwarmEvent::NewListenAddr { address, .. } => { // TODO(shahak): Find a better way to filter private addresses. - if !is_localhost(&address) { + if !is_localhost(&address) && self.hardcoded_external_multiaddr.is_none() { self.swarm.add_external_address(address); } } SwarmEvent::IncomingConnection { .. } | SwarmEvent::Dialing { .. } - | SwarmEvent::NewExternalAddrCandidate { .. } => {} + | SwarmEvent::NewExternalAddrCandidate { .. } + | SwarmEvent::NewExternalAddrOfPeer { .. } => {} _ => { error!("Unexpected event {event:?}"); } @@ -556,6 +567,7 @@ impl NetworkManager { session_timeout, idle_connection_timeout, bootstrap_peer_multiaddr, + hardcoded_external_multiaddr, secret_key, chain_id, } = config; @@ -565,6 +577,7 @@ impl NetworkManager { // format!("/ip4/0.0.0.0/udp/{quic_port}/quic-v1"), format!("/ip4/0.0.0.0/tcp/{tcp_port}"), ]; + let swarm = build_swarm(listen_addresses, idle_connection_timeout, secret_key, |key| { mixed_behaviour::MixedBehaviour::new( key, @@ -573,7 +586,12 @@ impl NetworkManager { chain_id, ) }); - Self::generic_new(swarm) + let hardcoded_external_multiaddr = hardcoded_external_multiaddr.map(|address| { + address.with_p2p(*swarm.local_peer_id()).expect( + "hardcoded_external_multiaddr has a peer id different than the local peer id", + ) + }); + Self::generic_new(swarm, hardcoded_external_multiaddr) } pub fn get_local_peer_id(&self) -> String { diff --git a/crates/papyrus_network/src/network_manager/swarm_trait.rs b/crates/papyrus_network/src/network_manager/swarm_trait.rs index 270154f962..fde1e0e713 100644 --- a/crates/papyrus_network/src/network_manager/swarm_trait.rs +++ b/crates/papyrus_network/src/network_manager/swarm_trait.rs @@ -3,7 +3,7 @@ use libp2p::gossipsub::{SubscriptionError, TopicHash}; use libp2p::swarm::dial_opts::DialOpts; use libp2p::swarm::{DialError, NetworkBehaviour, SwarmEvent}; use libp2p::{Multiaddr, PeerId, StreamProtocol, Swarm}; -use tracing::error; +use tracing::{error, info}; use crate::gossipsub_impl::Topic; use crate::mixed_behaviour; @@ -102,6 +102,7 @@ impl SwarmTrait for Swarm { } fn add_external_address(&mut self, address: Multiaddr) { + info!("Found new external address of this node: {address:?}"); self.add_external_address(address); } diff --git a/crates/papyrus_network/src/network_manager/test.rs b/crates/papyrus_network/src/network_manager/test.rs index a2b8851836..c4ecb9519f 100644 --- a/crates/papyrus_network/src/network_manager/test.rs +++ b/crates/papyrus_network/src/network_manager/test.rs @@ -217,7 +217,7 @@ async fn register_sqmr_protocol_client_and_use_channels() { mock_swarm.first_polled_event_notifier = Some(event_notifier); // network manager to register subscriber - let mut network_manager = GenericNetworkManager::generic_new(mock_swarm); + let mut network_manager = GenericNetworkManager::generic_new(mock_swarm, None); // register subscriber and send payload let mut payload_sender = network_manager.register_sqmr_protocol_client::, Vec>( @@ -279,7 +279,7 @@ async fn process_incoming_query() { let get_responses_fut = mock_swarm.get_responses_sent_to_inbound_session(inbound_session_id); let mut get_supported_inbound_protocol_fut = mock_swarm.get_supported_inbound_protocol(); - let mut network_manager = GenericNetworkManager::generic_new(mock_swarm); + let mut network_manager = GenericNetworkManager::generic_new(mock_swarm, None); let mut inbound_payload_receiver = network_manager .register_sqmr_protocol_server::, Vec>(protocol.to_string(), BUFFER_SIZE); @@ -315,7 +315,7 @@ async fn broadcast_message() { let mut mock_swarm = MockSwarm::default(); let mut messages_we_broadcasted_stream = mock_swarm.stream_messages_we_broadcasted(); - let mut network_manager = GenericNetworkManager::generic_new(mock_swarm); + let mut network_manager = GenericNetworkManager::generic_new(mock_swarm, None); let mut messages_to_broadcast_sender = network_manager .register_broadcast_topic(topic.clone(), BUFFER_SIZE) @@ -351,7 +351,7 @@ async fn receive_broadcasted_message_and_report_it() { ))); let mut reported_peer_receiver = mock_swarm.get_reported_peers_stream(); - let mut network_manager = GenericNetworkManager::generic_new(mock_swarm); + let mut network_manager = GenericNetworkManager::generic_new(mock_swarm, None); let mut broadcasted_messages_receiver = network_manager .register_broadcast_topic::(topic.clone(), BUFFER_SIZE) diff --git a/crates/papyrus_node/src/config/snapshots/papyrus_node__config__config_test__dump_default_config.snap b/crates/papyrus_node/src/config/snapshots/papyrus_node__config__config_test__dump_default_config.snap index 446e56caa8..b4a60c52ca 100644 --- a/crates/papyrus_node/src/config/snapshots/papyrus_node__config__config_test__dump_default_config.snap +++ b/crates/papyrus_node/src/config/snapshots/papyrus_node__config__config_test__dump_default_config.snap @@ -203,6 +203,16 @@ expression: dumped_default_config "value": "SN_MAIN", "privacy": "Public" }, + "network.hardcoded_external_multiaddr": { + "description": "The external address other peers see this node. If this is set, the node will not try to find out which addresses it has and will write this address as external instead", + "value": "", + "privacy": "Public" + }, + "network.hardcoded_external_multiaddr.#is_none": { + "description": "Flag for an optional field.", + "value": true, + "privacy": "TemporaryValue" + }, "network.idle_connection_timeout": { "description": "Amount of time in seconds that a connection with no active sessions will stay alive.", "value": { From 6d48a42197342c5f6df5d02a4b1647e4e9271ef8 Mon Sep 17 00:00:00 2001 From: Shahak Shama Date: Thu, 15 Aug 2024 10:55:27 +0300 Subject: [PATCH 16/16] fix(network): filter out localhost when identify notifies kademlia --- .../src/discovery/identify_impl.rs | 13 ++++++++++++- .../papyrus_network/src/discovery/kad_impl.rs | 6 +++++- .../papyrus_network/src/network_manager/mod.rs | 17 ++--------------- crates/papyrus_network/src/utils.rs | 16 +++++++++++++++- 4 files changed, 34 insertions(+), 18 deletions(-) diff --git a/crates/papyrus_network/src/discovery/identify_impl.rs b/crates/papyrus_network/src/discovery/identify_impl.rs index 85b342382b..945aed2942 100644 --- a/crates/papyrus_network/src/discovery/identify_impl.rs +++ b/crates/papyrus_network/src/discovery/identify_impl.rs @@ -2,6 +2,7 @@ use libp2p::{identify, Multiaddr, PeerId}; use crate::mixed_behaviour; use crate::mixed_behaviour::BridgedBehaviour; +use crate::utils::is_localhost; pub const IDENTIFY_PROTOCOL_VERSION: &str = "/staknet/identify/0.1.0-rc.0"; @@ -14,11 +15,21 @@ impl From for mixed_behaviour::Event { fn from(event: identify::Event) -> Self { match event { identify::Event::Received { peer_id, info } => { + // Filtering out localhost since it might collide with our own listen address if we + // use the same port. + // No need to filter out in discovery since there the address comes from the + // config, so if the user specified it they should make sure it doesn't collide + // with our own address + let listen_addresses = info + .listen_addrs + .into_iter() + .filter(|address| !is_localhost(address)) + .collect(); mixed_behaviour::Event::ToOtherBehaviourEvent( mixed_behaviour::ToOtherBehaviourEvent::Identify( IdentifyToOtherBehaviourEvent::FoundListenAddresses { peer_id, - listen_addresses: info.listen_addrs, + listen_addresses, }, ), ) diff --git a/crates/papyrus_network/src/discovery/kad_impl.rs b/crates/papyrus_network/src/discovery/kad_impl.rs index 6aa77baf1b..17b72d3063 100644 --- a/crates/papyrus_network/src/discovery/kad_impl.rs +++ b/crates/papyrus_network/src/discovery/kad_impl.rs @@ -1,5 +1,5 @@ use libp2p::kad; -use tracing::error; +use tracing::{error, info}; use super::identify_impl::IdentifyToOtherBehaviourEvent; use crate::mixed_behaviour::BridgedBehaviour; @@ -48,6 +48,10 @@ impl BridgedBehaviour for kad: | mixed_behaviour::ToOtherBehaviourEvent::Discovery( super::ToOtherBehaviourEvent::FoundListenAddresses { peer_id, listen_addresses }, ) => { + info!( + "Adding new listen addresses to routing table for peer {peer_id:?}: \ + {listen_addresses:?}" + ); for address in listen_addresses { self.add_address(peer_id, address.clone()); } diff --git a/crates/papyrus_network/src/network_manager/mod.rs b/crates/papyrus_network/src/network_manager/mod.rs index b6e62077e6..9ea3ba39bf 100644 --- a/crates/papyrus_network/src/network_manager/mod.rs +++ b/crates/papyrus_network/src/network_manager/mod.rs @@ -1,5 +1,4 @@ mod swarm_trait; -use core::net::Ipv4Addr; #[cfg(test)] mod test; @@ -14,10 +13,9 @@ use futures::future::{ready, BoxFuture, Ready}; use futures::sink::With; use futures::stream::{self, FuturesUnordered, Map, Stream}; use futures::{pin_mut, FutureExt, Sink, SinkExt, StreamExt}; -use libp2p::core::multiaddr::Protocol; use libp2p::gossipsub::{SubscriptionError, TopicHash}; use libp2p::swarm::SwarmEvent; -use libp2p::{Multiaddr, PeerId, StreamProtocol, Swarm}; +use libp2p::{PeerId, StreamProtocol, Swarm}; use metrics::gauge; use papyrus_common::metrics as papyrus_metrics; use sqmr::Bytes; @@ -28,7 +26,7 @@ use crate::bin_utils::build_swarm; use crate::gossipsub_impl::Topic; use crate::mixed_behaviour::{self, BridgedBehaviour}; use crate::sqmr::{self, InboundSessionId, OutboundSessionId, SessionId}; -use crate::utils::StreamHashMap; +use crate::utils::{is_localhost, StreamHashMap}; use crate::{gossipsub_impl, NetworkConfig}; #[derive(thiserror::Error, Debug)] @@ -924,14 +922,3 @@ pub struct TestSubscriberChannels> { pub subscriber_channels: BroadcastSubscriberChannels, pub mock_network: BroadcastNetworkMock, } - -fn is_localhost(address: &Multiaddr) -> bool { - let maybe_ip4_address = address.iter().find_map(|protocol| match protocol { - Protocol::Ip4(ip4_address) => Some(ip4_address), - _ => None, - }); - let Some(ip4_address) = maybe_ip4_address else { - return false; - }; - ip4_address == Ipv4Addr::LOCALHOST -} diff --git a/crates/papyrus_network/src/utils.rs b/crates/papyrus_network/src/utils.rs index 1a73734a69..bc0a3ab971 100644 --- a/crates/papyrus_network/src/utils.rs +++ b/crates/papyrus_network/src/utils.rs @@ -1,3 +1,4 @@ +use core::net::Ipv4Addr; use std::collections::hash_map::{Keys, ValuesMut}; use std::collections::{HashMap, HashSet}; use std::hash::Hash; @@ -5,11 +6,13 @@ use std::pin::Pin; use std::task::{Context, Poll, Waker}; use futures::stream::{Stream, StreamExt}; +use libp2p::core::multiaddr::Protocol; +use libp2p::Multiaddr; // This is an implementation of `StreamMap` from tokio_stream. The reason we're implementing it // ourselves is that the implementation in tokio_stream requires that the values implement the // Stream trait from tokio_stream and not from futures. -pub(crate) struct StreamHashMap { +pub struct StreamHashMap { map: HashMap, wakers_waiting_for_new_stream: Vec, } @@ -66,3 +69,14 @@ impl Stream for StreamHashMap bool { + let maybe_ip4_address = address.iter().find_map(|protocol| match protocol { + Protocol::Ip4(ip4_address) => Some(ip4_address), + _ => None, + }); + let Some(ip4_address) = maybe_ip4_address else { + return false; + }; + ip4_address == Ipv4Addr::LOCALHOST +}