From 6e6c72d99c3874fa5af22daba4aa4d1f66595526 Mon Sep 17 00:00:00 2001 From: mwtian <81660174+mwtian@users.noreply.github.com> Date: Fri, 6 Dec 2024 08:03:01 -0800 Subject: [PATCH] [consensus] support committee with 1 node (#20530) ## Description Obviously there is no fault tolerance. But this should be useful for running local cluster cheaply. `min_round_delay` should be set to a higher value for local cluster so not too many consensus blocks are built. ## Test plan Add unit and simtest. --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] Indexer: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: - [ ] REST API: --- consensus/core/src/authority_node.rs | 102 +++++++++++++++++- consensus/core/src/core.rs | 20 +--- consensus/core/src/dag_state.rs | 15 +-- consensus/core/src/round_prober.rs | 2 +- consensus/core/src/synchronizer.rs | 26 +++-- crates/sui-benchmark/tests/simtest.rs | 14 ++- .../src/unit_tests/mysticeti_manager_tests.rs | 2 +- 7 files changed, 140 insertions(+), 41 deletions(-) diff --git a/consensus/core/src/authority_node.rs b/consensus/core/src/authority_node.rs index 1b8b818b79721..706f8154f49e8 100644 --- a/consensus/core/src/authority_node.rs +++ b/consensus/core/src/authority_node.rs @@ -256,7 +256,8 @@ where block_manager, // For streaming RPC, Core will be notified when consumer is available. // For non-streaming RPC, there is no way to know so default to true. - !N::Client::SUPPORT_STREAMING, + // When there is only one (this) authority, assume subscriber exists. + !N::Client::SUPPORT_STREAMING || context.committee.size() == 1, commit_observer, core_signals, protocol_keypair, @@ -573,6 +574,105 @@ mod tests { } } + #[rstest] + #[tokio::test(flavor = "current_thread")] + async fn test_small_committee( + #[values(ConsensusNetwork::Anemo, ConsensusNetwork::Tonic)] network_type: ConsensusNetwork, + #[values(1, 2, 3)] num_authorities: usize, + ) { + let db_registry = Registry::new(); + DBMetrics::init(&db_registry); + + let (committee, keypairs) = local_committee_and_keys(0, vec![1; num_authorities]); + let protocol_config: ProtocolConfig = ProtocolConfig::get_for_max_version_UNSAFE(); + + let temp_dirs = (0..num_authorities) + .map(|_| TempDir::new().unwrap()) + .collect::>(); + + let mut output_receivers = Vec::with_capacity(committee.size()); + let mut authorities: Vec = Vec::with_capacity(committee.size()); + let mut boot_counters = vec![0; num_authorities]; + + for (index, _authority_info) in committee.authorities() { + let (authority, receiver) = make_authority( + index, + &temp_dirs[index.value()], + committee.clone(), + keypairs.clone(), + network_type, + boot_counters[index], + protocol_config.clone(), + ) + .await; + boot_counters[index] += 1; + output_receivers.push(receiver); + authorities.push(authority); + } + + const NUM_TRANSACTIONS: u8 = 15; + let mut submitted_transactions = BTreeSet::>::new(); + for i in 0..NUM_TRANSACTIONS { + let txn = vec![i; 16]; + submitted_transactions.insert(txn.clone()); + authorities[i as usize % authorities.len()] + .transaction_client() + .submit(vec![txn]) + .await + .unwrap(); + } + + for receiver in &mut output_receivers { + let mut expected_transactions = submitted_transactions.clone(); + loop { + let committed_subdag = + tokio::time::timeout(Duration::from_secs(1), receiver.recv()) + .await + .unwrap() + .unwrap(); + for b in committed_subdag.blocks { + for txn in b.transactions().iter().map(|t| t.data().to_vec()) { + assert!( + expected_transactions.remove(&txn), + "Transaction not submitted or already seen: {:?}", + txn + ); + } + } + assert_eq!(committed_subdag.reputation_scores_desc, vec![]); + if expected_transactions.is_empty() { + break; + } + } + } + + // Stop authority 0. + let index = committee.to_authority_index(0).unwrap(); + authorities.remove(index.value()).stop().await; + sleep(Duration::from_secs(10)).await; + + // Restart authority 0 and let it run. + let (authority, receiver) = make_authority( + index, + &temp_dirs[index.value()], + committee.clone(), + keypairs.clone(), + network_type, + boot_counters[index], + protocol_config.clone(), + ) + .await; + boot_counters[index] += 1; + output_receivers[index] = receiver; + authorities.insert(index.value(), authority); + sleep(Duration::from_secs(10)).await; + + // Stop all authorities and exit. + for authority in authorities { + authority.stop().await; + } + } + #[rstest] #[tokio::test(flavor = "current_thread")] async fn test_amnesia_recovery_success( diff --git a/consensus/core/src/core.rs b/consensus/core/src/core.rs index b798d367339f9..70c2909483a5a 100644 --- a/consensus/core/src/core.rs +++ b/consensus/core/src/core.rs @@ -131,10 +131,7 @@ impl Core { .with_pipeline(true) .build(); - // Recover the last proposed block - let last_proposed_block = dag_state - .read() - .get_last_block_for_authority(context.own_index); + let last_proposed_block = dag_state.read().get_last_proposed_block(); // Recover the last included ancestor rounds based on the last proposed block. That will allow // to perform the next block proposal by using ancestor blocks of higher rounds and avoid @@ -207,10 +204,6 @@ impl Core { "Waiting for {} ms while recovering ancestors from storage", wait_ms ); - println!( - "Waiting for {} ms while recovering ancestors from storage", - wait_ms - ); std::thread::sleep(Duration::from_millis(wait_ms)); } // Recover the last available quorum to correctly advance the threshold clock. @@ -223,13 +216,10 @@ impl Core { { last_proposed_block } else { - let last_proposed_block = self - .dag_state - .read() - .get_last_block_for_authority(self.context.own_index); + let last_proposed_block = self.dag_state.read().get_last_proposed_block(); if self.should_propose() { - assert!(last_proposed_block.round() > GENESIS_ROUND, "At minimum a block of round higher that genesis should have been produced during recovery"); + assert!(last_proposed_block.round() > GENESIS_ROUND, "At minimum a block of round higher than genesis should have been produced during recovery"); } // if no new block proposed then just re-broadcast the last proposed one to ensure liveness. @@ -1001,9 +991,7 @@ impl Core { } fn last_proposed_block(&self) -> VerifiedBlock { - self.dag_state - .read() - .get_last_block_for_authority(self.context.own_index) + self.dag_state.read().get_last_proposed_block() } } diff --git a/consensus/core/src/dag_state.rs b/consensus/core/src/dag_state.rs index 1fc9a30a872e3..2c09b29655710 100644 --- a/consensus/core/src/dag_state.rs +++ b/consensus/core/src/dag_state.rs @@ -452,7 +452,13 @@ impl DagState { blocks.first().cloned().unwrap() } - /// Retrieves the last block proposed for the specified `authority`. If no block is found in cache + /// Gets the last proposed block from this authority. + /// If no block is proposed yet, returns the genesis block. + pub(crate) fn get_last_proposed_block(&self) -> VerifiedBlock { + self.get_last_block_for_authority(self.context.own_index) + } + + /// Retrieves the last accepted block from the specified `authority`. If no block is found in cache /// then the genesis block is returned as no other block has been received from that authority. pub(crate) fn get_last_block_for_authority(&self, authority: AuthorityIndex) -> VerifiedBlock { if let Some(last) = self.recent_refs_by_authority[authority].last() { @@ -2209,12 +2215,7 @@ mod test { .find(|block| block.author() == context.own_index) .unwrap(); - assert_eq!( - dag_state - .read() - .get_last_block_for_authority(context.own_index), - my_genesis - ); + assert_eq!(dag_state.read().get_last_proposed_block(), my_genesis); } // WHEN adding some blocks for authorities, only the last ones should be returned diff --git a/consensus/core/src/round_prober.rs b/consensus/core/src/round_prober.rs index 9fb8022ed9703..9099b6b368c94 100644 --- a/consensus/core/src/round_prober.rs +++ b/consensus/core/src/round_prober.rs @@ -159,7 +159,7 @@ impl RoundProber { .collect::>(); let last_proposed_round = local_highest_accepted_rounds[own_index]; - // For our own index, the highest recieved & accepted round is our last + // For our own index, the highest received & accepted round is our last // accepted round or our last proposed round. highest_received_rounds[own_index] = self.core_thread_dispatcher.highest_received_rounds(); highest_accepted_rounds[own_index] = local_highest_accepted_rounds; diff --git a/consensus/core/src/synchronizer.rs b/consensus/core/src/synchronizer.rs index 92c2895fe580d..3b6d1c4f4fa9f 100644 --- a/consensus/core/src/synchronizer.rs +++ b/consensus/core/src/synchronizer.rs @@ -710,6 +710,7 @@ impl Synchronizer Synchronizer Synchronizer) -> Arc