From fa3ecc31e7c5b9837f625b9504f1e4ca5df815c5 Mon Sep 17 00:00:00 2001 From: William Smith Date: Fri, 3 Jan 2025 15:39:08 -0500 Subject: [PATCH] [core] remove checkpoint executor unit testss (#20624) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Description Unit testing checkpoint executor does not provide sufficient signal compared to end to end simtests to warrant their continued existence. As the system has evolved, the assumptions in these tests have all been broken at some point or another, causing them to fail or become flaky. Removing. ## Test plan 👀 --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] Indexer: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: - [ ] REST API: --- .../checkpoints/checkpoint_executor/mod.rs | 3 - .../checkpoints/checkpoint_executor/tests.rs | 404 ------------------ 2 files changed, 407 deletions(-) delete mode 100644 crates/sui-core/src/checkpoints/checkpoint_executor/tests.rs diff --git a/crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs b/crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs index 40824982d3453..07e196b9999a5 100644 --- a/crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs +++ b/crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs @@ -70,9 +70,6 @@ use crate::{ mod data_ingestion_handler; pub mod metrics; -#[cfg(test)] -pub(crate) mod tests; - type CheckpointExecutionBuffer = FuturesOrdered< JoinHandle<( VerifiedCheckpoint, diff --git a/crates/sui-core/src/checkpoints/checkpoint_executor/tests.rs b/crates/sui-core/src/checkpoints/checkpoint_executor/tests.rs deleted file mode 100644 index 1771548954152..0000000000000 --- a/crates/sui-core/src/checkpoints/checkpoint_executor/tests.rs +++ /dev/null @@ -1,404 +0,0 @@ -// Copyright (c) Mysten Labs, Inc. -// SPDX-License-Identifier: Apache-2.0 - -use super::*; -use sui_config::node::ExpensiveSafetyCheckConfig; -use sui_types::gas::GasCostSummary; -use tempfile::tempdir; - -use std::{sync::Arc, time::Duration}; - -use crate::authority::epoch_start_configuration::{EpochFlag, EpochStartConfiguration}; -use broadcast::{Receiver, Sender}; -use sui_types::committee::ProtocolVersion; -use sui_types::messages_checkpoint::{ECMHLiveObjectSetDigest, EndOfEpochData, VerifiedCheckpoint}; -use sui_types::supported_protocol_versions::SupportedProtocolVersions; -use tokio::{sync::broadcast, time::timeout}; - -use crate::authority::test_authority_builder::TestAuthorityBuilder; -use crate::{ - authority::AuthorityState, checkpoints::CheckpointStore, state_accumulator::StateAccumulator, -}; -use sui_swarm_config::test_utils::{empty_contents, CommitteeFixture}; -use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemState; -use typed_store::Map; - -/// Test that checkpoint execution correctly signals end of epoch after -/// receiving last checkpoint of epoch, then resumes executing cehckpoints -/// from the next epoch if called after reconfig -/// -/// TODO(william) disabling reconfig unit tests here for now until we can work -/// on correctly inserting transactions, especially the change_epoch tx. As it stands, this -/// is better tested in existing reconfig simtests -#[tokio::test] -#[ignore] -pub async fn test_checkpoint_executor_cross_epoch() { - let buffer_size = 10; - let num_to_sync_per_epoch = buffer_size * 2; - let tempdir = tempdir().unwrap(); - let checkpoint_store = CheckpointStore::new(tempdir.path()); - - let (authority_state, mut executor, accumulator, checkpoint_sender, first_committee): ( - Arc, - CheckpointExecutor, - Arc, - Sender, - CommitteeFixture, - ) = init_executor_test(buffer_size, checkpoint_store.clone()).await; - - let epoch_store = authority_state.epoch_store_for_testing(); - let epoch = epoch_store.epoch(); - assert_eq!(epoch, 0); - - assert!(checkpoint_store - .get_highest_executed_checkpoint_seq_number() - .unwrap() - .is_none()); - - // sync 20 checkpoints - let cold_start_checkpoints = sync_new_checkpoints( - &checkpoint_store, - &checkpoint_sender, - num_to_sync_per_epoch, - None, - &first_committee, - ); - - // sync end of epoch checkpoint - let last_executed_checkpoint = cold_start_checkpoints.last().cloned().unwrap(); - let (end_of_epoch_0_checkpoint, second_committee) = sync_end_of_epoch_checkpoint( - authority_state.clone(), - &checkpoint_store, - &checkpoint_sender, - last_executed_checkpoint.clone(), - &first_committee, - ) - .await; - - // sync 20 more checkpoints - let next_epoch_checkpoints = sync_new_checkpoints( - &checkpoint_store, - &checkpoint_sender, - num_to_sync_per_epoch, - Some(end_of_epoch_0_checkpoint.clone()), - &second_committee, - ); - - authority_state - .get_checkpoint_store() - .epoch_last_checkpoint_map - .insert( - &end_of_epoch_0_checkpoint.epoch, - end_of_epoch_0_checkpoint.sequence_number(), - ) - .unwrap(); - authority_state - .get_checkpoint_store() - .certified_checkpoints - .insert( - end_of_epoch_0_checkpoint.sequence_number(), - end_of_epoch_0_checkpoint.serializable_ref(), - ) - .unwrap(); - // sync end of epoch checkpoint - let last_executed_checkpoint = next_epoch_checkpoints.last().cloned().unwrap(); - let (_end_of_epoch_1_checkpoint, _third_committee) = sync_end_of_epoch_checkpoint( - authority_state.clone(), - &checkpoint_store, - &checkpoint_sender, - last_executed_checkpoint.clone(), - &second_committee, - ) - .await; - - // Ensure root state hash for epoch does not exist before we close epoch - assert!(authority_state - .get_accumulator_store() - .get_root_state_accumulator_for_epoch(0) - .unwrap() - .is_none()); - - // Ensure executor reaches end of epoch in a timely manner - timeout(Duration::from_secs(5), async { - executor.run_epoch(epoch_store.clone(), None).await; - }) - .await - .unwrap(); - - // We should have synced up to epoch boundary - assert_eq!( - checkpoint_store - .get_highest_executed_checkpoint_seq_number() - .unwrap() - .unwrap(), - num_to_sync_per_epoch as u64, - ); - - let first_epoch = 0; - - // Ensure root state hash for epoch exists at end of epoch - authority_state - .get_accumulator_store() - .get_root_state_accumulator_for_epoch(first_epoch) - .unwrap() - .expect("root state hash for epoch should exist"); - - let system_state = EpochStartSystemState::new_for_testing_with_epoch(1); - - let new_epoch_store = authority_state - .reconfigure( - &authority_state.epoch_store_for_testing(), - SupportedProtocolVersions::SYSTEM_DEFAULT, - second_committee.committee().clone(), - EpochStartConfiguration::new( - system_state, - Default::default(), - authority_state.get_object_store(), - EpochFlag::default_flags_for_new_epoch(&authority_state.config), - ) - .unwrap(), - accumulator, - &ExpensiveSafetyCheckConfig::default(), - ) - .await - .unwrap(); - - // checkpoint execution should resume starting at checkpoints - // of next epoch - timeout(Duration::from_secs(5), async { - executor.run_epoch(new_epoch_store.clone(), None).await; - }) - .await - .unwrap(); - - assert_eq!( - checkpoint_store - .get_highest_executed_checkpoint_seq_number() - .unwrap() - .unwrap(), - 2 * num_to_sync_per_epoch as u64 + 1, - ); - - let second_epoch = 1; - assert!(second_epoch == new_epoch_store.epoch()); - - authority_state - .get_accumulator_store() - .get_root_state_accumulator_for_epoch(second_epoch) - .unwrap() - .expect("root state hash for epoch should exist"); -} - -/// Test that if we crash at end of epoch / during reconfig, we recover on startup -/// by starting at the old epoch and immediately retrying reconfig -/// -/// TODO(william) disabling reconfig unit tests here for now until we can work -/// on correctly inserting transactions, especially the change_epoch tx. As it stands, this -/// is better tested in existing reconfig simtests -#[tokio::test] -#[ignore] -pub async fn test_reconfig_crash_recovery() { - let tempdir = tempdir().unwrap(); - let checkpoint_store = CheckpointStore::new(tempdir.path()); - - // new Node (syncing from checkpoint 0) - let (authority_state, mut executor, accumulator, checkpoint_sender, first_committee): ( - Arc, - CheckpointExecutor, - Arc, - Sender, - CommitteeFixture, - ) = init_executor_test( - 10, /* StateSync -> Executor channel buffer size */ - checkpoint_store.clone(), - ) - .await; - - assert!(checkpoint_store - .get_highest_executed_checkpoint_seq_number() - .unwrap() - .is_none()); - - // sync 1 checkpoint - let checkpoint = sync_new_checkpoints( - &checkpoint_store, - &checkpoint_sender, - 1, - None, - &first_committee, - ) - .pop() - .unwrap(); - - // sync end of epoch checkpoint - let (end_of_epoch_checkpoint, second_committee) = sync_end_of_epoch_checkpoint( - authority_state.clone(), - &checkpoint_store, - &checkpoint_sender, - checkpoint, - &first_committee, - ) - .await; - // sync 1 more checkpoint - let _next_epoch_checkpoints = sync_new_checkpoints( - &checkpoint_store, - &checkpoint_sender, - 1, - Some(end_of_epoch_checkpoint.clone()), - &second_committee, - ); - - timeout(Duration::from_secs(1), async { - executor - .run_epoch(authority_state.epoch_store_for_testing().clone(), None) - .await; - }) - .await - .unwrap(); - - // Check that we stopped execution at epoch boundary - assert_eq!( - checkpoint_store - .get_highest_executed_checkpoint_seq_number() - .unwrap() - .unwrap(), - *end_of_epoch_checkpoint.sequence_number(), - ); - - // Drop and re-istantiate checkpoint executor without performing reconfig. This - // is logically equivalent to reconfig crashing and the node restarting, in which - // case executor should be able to infer that, rather than beginning execution of - // the next epoch, we should immediately exit so that reconfig can be reattempted. - drop(executor); - let mut executor = CheckpointExecutor::new_for_tests( - checkpoint_sender.subscribe(), - checkpoint_store.clone(), - authority_state.clone(), - accumulator.clone(), - ); - - timeout(Duration::from_millis(200), async { - executor - .run_epoch(authority_state.epoch_store_for_testing().clone(), None) - .await; - }) - .await - .unwrap(); - - // Check that we have still not gone beyond epoch boundary - assert_eq!( - checkpoint_store - .get_highest_executed_checkpoint_seq_number() - .unwrap() - .unwrap(), - *end_of_epoch_checkpoint.sequence_number(), - ); -} - -async fn init_executor_test( - buffer_size: usize, - store: Arc, -) -> ( - Arc, - CheckpointExecutor, - Arc, - Sender, - CommitteeFixture, -) { - let network_config = - sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir().build(); - let state = TestAuthorityBuilder::new() - .with_network_config(&network_config, 0) - .build() - .await; - - let (checkpoint_sender, _): (Sender, Receiver) = - broadcast::channel(buffer_size); - let epoch_store = state.epoch_store_for_testing(); - - let accumulator = - StateAccumulator::new_for_tests(state.get_accumulator_store().clone(), &epoch_store); - let accumulator = Arc::new(accumulator); - - let executor = CheckpointExecutor::new_for_tests( - checkpoint_sender.subscribe(), - store.clone(), - state.clone(), - accumulator.clone(), - ); - ( - state, - executor, - accumulator, - checkpoint_sender, - CommitteeFixture::from_network_config(&network_config), - ) -} - -/// Creates and simulates syncing of a new checkpoint by StateSync, i.e. new -/// checkpoint is persisted, along with its contents, highest synced checkpoint -/// watermark is updated, and message is broadcasted notifying of the newly synced -/// checkpoint. Returns created checkpoints -fn sync_new_checkpoints( - checkpoint_store: &CheckpointStore, - sender: &Sender, - number_of_checkpoints: usize, - previous_checkpoint: Option, - committee: &CommitteeFixture, -) -> Vec { - let (ordered_checkpoints, _, _sequence_number_to_digest, _checkpoints) = - committee.make_empty_checkpoints(number_of_checkpoints, previous_checkpoint); - - for checkpoint in ordered_checkpoints.iter() { - sync_checkpoint(checkpoint, checkpoint_store, sender); - } - - ordered_checkpoints -} - -async fn sync_end_of_epoch_checkpoint( - authority_state: Arc, - checkpoint_store: &CheckpointStore, - sender: &Sender, - previous_checkpoint: VerifiedCheckpoint, - committee: &CommitteeFixture, -) -> (VerifiedCheckpoint, CommitteeFixture) { - let new_committee = - CommitteeFixture::generate(rand::rngs::OsRng, committee.committee().epoch + 1, 4); - let (_sequence_number, _digest, checkpoint) = committee.make_end_of_epoch_checkpoint( - previous_checkpoint, - Some(EndOfEpochData { - next_epoch_committee: new_committee.committee().voting_rights.clone(), - next_epoch_protocol_version: ProtocolVersion::MIN, - epoch_commitments: vec![ECMHLiveObjectSetDigest::default().into()], - }), - ); - authority_state - .create_and_execute_advance_epoch_tx( - &authority_state.epoch_store_for_testing().clone(), - &GasCostSummary::new(0, 0, 0, 0), - *checkpoint.sequence_number(), - 0, // epoch_start_timestamp_ms - ) - .await - .expect("Failed to create and execute advance epoch tx"); - sync_checkpoint(&checkpoint, checkpoint_store, sender); - (checkpoint, new_committee) -} - -fn sync_checkpoint( - checkpoint: &VerifiedCheckpoint, - checkpoint_store: &CheckpointStore, - sender: &Sender, -) { - checkpoint_store - .insert_verified_checkpoint(checkpoint) - .unwrap(); - checkpoint_store - .insert_checkpoint_contents(empty_contents().into_inner().into_checkpoint_contents()) - .unwrap(); - checkpoint_store - .update_highest_synced_checkpoint(checkpoint) - .unwrap(); - sender.send(checkpoint.clone()).unwrap(); -}