Skip to content

Commit

Permalink
Fix flakiness in test_reconfig_with_committee_change_stress (#18182)
Browse files Browse the repository at this point in the history
Previously, the test could fail because newly created validators might
not be in the current epoch when asked to terminate the epoch early.
This could result in not having a quorum for end-of-epoch.

The fix is to wait for newly spawned validators to sync to the current
epoch.
  • Loading branch information
mystenmark authored Jun 11, 2024
1 parent 0c845e7 commit 398cd16
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 47 deletions.
5 changes: 5 additions & 0 deletions crates/sui-core/src/consensus_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,9 @@ impl ConsensusAdapter {
recovered.len()
);
for transaction in recovered {
if transaction.is_end_of_publish() {
info!(epoch=?epoch_store.epoch(), "Submitting EndOfPublish message to consensus");
}
self.submit_unchecked(&[transaction], epoch_store);
}
}
Expand Down Expand Up @@ -879,6 +882,7 @@ impl ConsensusAdapter {
};
if send_end_of_publish {
// sending message outside of any locks scope
info!(epoch=?epoch_store.epoch(), "Sending EndOfPublish message to consensus");
if let Err(err) = self.submit(
ConsensusTransaction::new_end_of_publish(self.authority),
None,
Expand Down Expand Up @@ -992,6 +996,7 @@ impl ReconfigurationInitiator for Arc<ConsensusAdapter> {
// reconfig_guard lock is dropped here.
};
if send_end_of_publish {
info!(epoch=?epoch_store.epoch(), "Sending EndOfPublish message to consensus");
if let Err(err) = self.submit(
ConsensusTransaction::new_end_of_publish(self.authority),
None,
Expand Down
9 changes: 9 additions & 0 deletions crates/sui-e2e-tests/tests/reconfiguration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,8 @@ async fn do_test_reconfig_with_committee_change_stress() {
.build()
.await;

let mut cur_epoch = 0;

while let Some(v1) = candidates.pop() {
let v2 = candidates.pop().unwrap();
execute_add_validator_transactions(&test_cluster, &v1).await;
Expand All @@ -703,11 +705,18 @@ async fn do_test_reconfig_with_committee_change_stress() {
}
let handle1 = test_cluster.spawn_new_validator(v1).await;
let handle2 = test_cluster.spawn_new_validator(v2).await;

tokio::join!(
test_cluster.wait_for_epoch_on_node(&handle1, Some(cur_epoch), Duration::from_secs(60)),
test_cluster.wait_for_epoch_on_node(&handle2, Some(cur_epoch), Duration::from_secs(60))
);

test_cluster.trigger_reconfiguration().await;
let committee = test_cluster
.fullnode_handle
.sui_node
.with(|node| node.state().epoch_store_for_testing().committee().clone());
cur_epoch = committee.epoch();
assert_eq!(committee.num_members(), 7);
assert!(committee.authority_exists(&handle1.state().name));
assert!(committee.authority_exists(&handle2.state().name));
Expand Down
105 changes: 59 additions & 46 deletions crates/test-cluster/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,52 +291,6 @@ impl TestCluster {
.unwrap()
}

/// To detect whether the network has reached such state, we use the fullnode as the
/// source of truth, since a fullnode only does epoch transition when the network has
/// done so.
/// If target_epoch is specified, wait until the cluster reaches that epoch.
/// If target_epoch is None, wait until the cluster reaches the next epoch.
/// Note that this function does not guarantee that every node is at the target epoch.
pub async fn wait_for_epoch(&self, target_epoch: Option<EpochId>) -> SuiSystemState {
self.wait_for_epoch_with_timeout(target_epoch, Duration::from_secs(60))
.await
}

pub async fn wait_for_epoch_with_timeout(
&self,
target_epoch: Option<EpochId>,
timeout_dur: Duration,
) -> SuiSystemState {
let mut epoch_rx = self
.fullnode_handle
.sui_node
.with(|node| node.subscribe_to_epoch_change());
let mut state = Option::None;
timeout(timeout_dur, async {
while let Ok(system_state) = epoch_rx.recv().await {
info!("received epoch {}", system_state.epoch());
state = Some(system_state.clone());
match target_epoch {
Some(target_epoch) if system_state.epoch() >= target_epoch => {
return system_state;
}
None => {
return system_state;
}
_ => (),
}
}
unreachable!("Broken reconfig channel");
})
.await
.unwrap_or_else(|_| {
if let Some(state) = state {
panic!("Timed out waiting for cluster to reach epoch {target_epoch:?}. Current epoch: {}", state.epoch());
}
panic!("Timed out waiting for cluster to target epoch {target_epoch:?}")
})
}

pub async fn wait_for_run_with_range_shutdown_signal(&self) -> Option<RunWithRange> {
self.wait_for_run_with_range_shutdown_signal_with_timeout(Duration::from_secs(60))
.await
Expand Down Expand Up @@ -430,6 +384,65 @@ impl TestCluster {
info!("reconfiguration complete after {:?}", start.elapsed());
}

/// To detect whether the network has reached such state, we use the fullnode as the
/// source of truth, since a fullnode only does epoch transition when the network has
/// done so.
/// If target_epoch is specified, wait until the cluster reaches that epoch.
/// If target_epoch is None, wait until the cluster reaches the next epoch.
/// Note that this function does not guarantee that every node is at the target epoch.
pub async fn wait_for_epoch(&self, target_epoch: Option<EpochId>) -> SuiSystemState {
self.wait_for_epoch_with_timeout(target_epoch, Duration::from_secs(60))
.await
}

pub async fn wait_for_epoch_on_node(
&self,
handle: &SuiNodeHandle,
target_epoch: Option<EpochId>,
timeout_dur: Duration,
) -> SuiSystemState {
let mut epoch_rx = handle.with(|node| node.subscribe_to_epoch_change());

let mut state = None;
timeout(timeout_dur, async {
let epoch = handle.with(|node| node.state().epoch_store_for_testing().epoch());
if Some(epoch) == target_epoch {
return handle.with(|node| node.state().get_sui_system_state_object_for_testing().unwrap());
}
while let Ok(system_state) = epoch_rx.recv().await {
info!("received epoch {}", system_state.epoch());
state = Some(system_state.clone());
match target_epoch {
Some(target_epoch) if system_state.epoch() >= target_epoch => {
return system_state;
}
None => {
return system_state;
}
_ => (),
}
}
unreachable!("Broken reconfig channel");
})
.await
.unwrap_or_else(|_| {
error!("Timed out waiting for cluster to reach epoch {target_epoch:?}");
if let Some(state) = state {
panic!("Timed out waiting for cluster to reach epoch {target_epoch:?}. Current epoch: {}", state.epoch());
}
panic!("Timed out waiting for cluster to target epoch {target_epoch:?}")
})
}

pub async fn wait_for_epoch_with_timeout(
&self,
target_epoch: Option<EpochId>,
timeout_dur: Duration,
) -> SuiSystemState {
self.wait_for_epoch_on_node(&self.fullnode_handle.sui_node, target_epoch, timeout_dur)
.await
}

pub async fn wait_for_epoch_all_nodes(&self, target_epoch: EpochId) {
let handles: Vec<_> = self
.swarm
Expand Down
2 changes: 1 addition & 1 deletion scripts/simtest/seed-search.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def main(commands):

for i in range(1, args.num_seeds + 1):
next_seed = args.seed_start + i
commands.append(("%s %s" % (binary, args.test), {
commands.append(("%s --exact %s" % (binary, args.test), {
"MSIM_TEST_SEED": "%d" % next_seed,
"RUST_LOG": "off",
}))
Expand Down

0 comments on commit 398cd16

Please sign in to comment.