diff --git a/consensus/core/src/authority_node.rs b/consensus/core/src/authority_node.rs index 1b8b818b79721..706f8154f49e8 100644 --- a/consensus/core/src/authority_node.rs +++ b/consensus/core/src/authority_node.rs @@ -256,7 +256,8 @@ where block_manager, // For streaming RPC, Core will be notified when consumer is available. // For non-streaming RPC, there is no way to know so default to true. - !N::Client::SUPPORT_STREAMING, + // When there is only one (this) authority, assume subscriber exists. + !N::Client::SUPPORT_STREAMING || context.committee.size() == 1, commit_observer, core_signals, protocol_keypair, @@ -573,6 +574,105 @@ mod tests { } } + #[rstest] + #[tokio::test(flavor = "current_thread")] + async fn test_small_committee( + #[values(ConsensusNetwork::Anemo, ConsensusNetwork::Tonic)] network_type: ConsensusNetwork, + #[values(1, 2, 3)] num_authorities: usize, + ) { + let db_registry = Registry::new(); + DBMetrics::init(&db_registry); + + let (committee, keypairs) = local_committee_and_keys(0, vec![1; num_authorities]); + let protocol_config: ProtocolConfig = ProtocolConfig::get_for_max_version_UNSAFE(); + + let temp_dirs = (0..num_authorities) + .map(|_| TempDir::new().unwrap()) + .collect::>(); + + let mut output_receivers = Vec::with_capacity(committee.size()); + let mut authorities: Vec = Vec::with_capacity(committee.size()); + let mut boot_counters = vec![0; num_authorities]; + + for (index, _authority_info) in committee.authorities() { + let (authority, receiver) = make_authority( + index, + &temp_dirs[index.value()], + committee.clone(), + keypairs.clone(), + network_type, + boot_counters[index], + protocol_config.clone(), + ) + .await; + boot_counters[index] += 1; + output_receivers.push(receiver); + authorities.push(authority); + } + + const NUM_TRANSACTIONS: u8 = 15; + let mut submitted_transactions = BTreeSet::>::new(); + for i in 0..NUM_TRANSACTIONS { + let txn = vec![i; 16]; + submitted_transactions.insert(txn.clone()); + authorities[i as usize % authorities.len()] + .transaction_client() + .submit(vec![txn]) + .await + .unwrap(); + } + + for receiver in &mut output_receivers { + let mut expected_transactions = submitted_transactions.clone(); + loop { + let committed_subdag = + tokio::time::timeout(Duration::from_secs(1), receiver.recv()) + .await + .unwrap() + .unwrap(); + for b in committed_subdag.blocks { + for txn in b.transactions().iter().map(|t| t.data().to_vec()) { + assert!( + expected_transactions.remove(&txn), + "Transaction not submitted or already seen: {:?}", + txn + ); + } + } + assert_eq!(committed_subdag.reputation_scores_desc, vec![]); + if expected_transactions.is_empty() { + break; + } + } + } + + // Stop authority 0. + let index = committee.to_authority_index(0).unwrap(); + authorities.remove(index.value()).stop().await; + sleep(Duration::from_secs(10)).await; + + // Restart authority 0 and let it run. + let (authority, receiver) = make_authority( + index, + &temp_dirs[index.value()], + committee.clone(), + keypairs.clone(), + network_type, + boot_counters[index], + protocol_config.clone(), + ) + .await; + boot_counters[index] += 1; + output_receivers[index] = receiver; + authorities.insert(index.value(), authority); + sleep(Duration::from_secs(10)).await; + + // Stop all authorities and exit. + for authority in authorities { + authority.stop().await; + } + } + #[rstest] #[tokio::test(flavor = "current_thread")] async fn test_amnesia_recovery_success( diff --git a/consensus/core/src/core.rs b/consensus/core/src/core.rs index b798d367339f9..70c2909483a5a 100644 --- a/consensus/core/src/core.rs +++ b/consensus/core/src/core.rs @@ -131,10 +131,7 @@ impl Core { .with_pipeline(true) .build(); - // Recover the last proposed block - let last_proposed_block = dag_state - .read() - .get_last_block_for_authority(context.own_index); + let last_proposed_block = dag_state.read().get_last_proposed_block(); // Recover the last included ancestor rounds based on the last proposed block. That will allow // to perform the next block proposal by using ancestor blocks of higher rounds and avoid @@ -207,10 +204,6 @@ impl Core { "Waiting for {} ms while recovering ancestors from storage", wait_ms ); - println!( - "Waiting for {} ms while recovering ancestors from storage", - wait_ms - ); std::thread::sleep(Duration::from_millis(wait_ms)); } // Recover the last available quorum to correctly advance the threshold clock. @@ -223,13 +216,10 @@ impl Core { { last_proposed_block } else { - let last_proposed_block = self - .dag_state - .read() - .get_last_block_for_authority(self.context.own_index); + let last_proposed_block = self.dag_state.read().get_last_proposed_block(); if self.should_propose() { - assert!(last_proposed_block.round() > GENESIS_ROUND, "At minimum a block of round higher that genesis should have been produced during recovery"); + assert!(last_proposed_block.round() > GENESIS_ROUND, "At minimum a block of round higher than genesis should have been produced during recovery"); } // if no new block proposed then just re-broadcast the last proposed one to ensure liveness. @@ -1001,9 +991,7 @@ impl Core { } fn last_proposed_block(&self) -> VerifiedBlock { - self.dag_state - .read() - .get_last_block_for_authority(self.context.own_index) + self.dag_state.read().get_last_proposed_block() } } diff --git a/consensus/core/src/dag_state.rs b/consensus/core/src/dag_state.rs index 1fc9a30a872e3..2c09b29655710 100644 --- a/consensus/core/src/dag_state.rs +++ b/consensus/core/src/dag_state.rs @@ -452,7 +452,13 @@ impl DagState { blocks.first().cloned().unwrap() } - /// Retrieves the last block proposed for the specified `authority`. If no block is found in cache + /// Gets the last proposed block from this authority. + /// If no block is proposed yet, returns the genesis block. + pub(crate) fn get_last_proposed_block(&self) -> VerifiedBlock { + self.get_last_block_for_authority(self.context.own_index) + } + + /// Retrieves the last accepted block from the specified `authority`. If no block is found in cache /// then the genesis block is returned as no other block has been received from that authority. pub(crate) fn get_last_block_for_authority(&self, authority: AuthorityIndex) -> VerifiedBlock { if let Some(last) = self.recent_refs_by_authority[authority].last() { @@ -2209,12 +2215,7 @@ mod test { .find(|block| block.author() == context.own_index) .unwrap(); - assert_eq!( - dag_state - .read() - .get_last_block_for_authority(context.own_index), - my_genesis - ); + assert_eq!(dag_state.read().get_last_proposed_block(), my_genesis); } // WHEN adding some blocks for authorities, only the last ones should be returned diff --git a/consensus/core/src/round_prober.rs b/consensus/core/src/round_prober.rs index 9fb8022ed9703..9099b6b368c94 100644 --- a/consensus/core/src/round_prober.rs +++ b/consensus/core/src/round_prober.rs @@ -159,7 +159,7 @@ impl RoundProber { .collect::>(); let last_proposed_round = local_highest_accepted_rounds[own_index]; - // For our own index, the highest recieved & accepted round is our last + // For our own index, the highest received & accepted round is our last // accepted round or our last proposed round. highest_received_rounds[own_index] = self.core_thread_dispatcher.highest_received_rounds(); highest_accepted_rounds[own_index] = local_highest_accepted_rounds; diff --git a/consensus/core/src/synchronizer.rs b/consensus/core/src/synchronizer.rs index 92c2895fe580d..3b6d1c4f4fa9f 100644 --- a/consensus/core/src/synchronizer.rs +++ b/consensus/core/src/synchronizer.rs @@ -710,6 +710,7 @@ impl Synchronizer Synchronizer Synchronizer) -> Arc