Skip to content

Commit

Permalink
[Checkpoint] wait for checkpoint service to stop during reconfig (#17556
Browse files Browse the repository at this point in the history
)

## Description 

Currently during reconfig, `CheckpointService` tasks, including
`CheckpointBuilder` and `CheckpointAggregator`, are [notified to shut
down](https://github.com/MystenLabs/sui/blob/b1540cdb2019f0501d2cd7f6dad208f65a66b6d2/crates/sui-node/src/lib.rs#L1551).
But reconfig does not wait for them to finish shutting down. There can
be a race between the reconfig loop proceeding to [drop the epoch db
handle](https://github.com/MystenLabs/sui/blob/b1540cdb2019f0501d2cd7f6dad208f65a66b6d2/crates/sui-node/src/lib.rs#L1633),
while `CheckpointBuilder` tries to [read from epoch db when creating a
new
checkpoint](https://github.com/MystenLabs/sui/blob/b1540cdb2019f0501d2cd7f6dad208f65a66b6d2/crates/sui-core/src/checkpoints/mod.rs#L1378).
The race can result in panics.

## Test plan 

CI
Simulation

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
  • Loading branch information
mwtian authored Oct 18, 2024
1 parent ae86147 commit 8d2bb84
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 29 deletions.
24 changes: 16 additions & 8 deletions crates/sui-core/src/checkpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use diffy::create_patch;
use futures::future::{select, Either};
use futures::FutureExt;
use itertools::Itertools;
use mysten_metrics::{monitored_scope, spawn_monitored_task, MonitoredFutureExt};
use mysten_metrics::{monitored_future, monitored_scope, MonitoredFutureExt};
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use sui_macros::fail_point;
Expand Down Expand Up @@ -65,6 +65,7 @@ use sui_types::sui_system_state::{SuiSystemState, SuiSystemStateTrait};
use sui_types::transaction::{TransactionDataAPI, TransactionKey, TransactionKind};
use tokio::{
sync::{watch, Notify},
task::JoinSet,
time::timeout,
};
use tracing::{debug, error, info, instrument, warn};
Expand Down Expand Up @@ -2240,7 +2241,11 @@ impl CheckpointService {
metrics: Arc<CheckpointMetrics>,
max_transactions_per_checkpoint: usize,
max_checkpoint_size_bytes: usize,
) -> (Arc<Self>, watch::Sender<()> /* The exit sender */) {
) -> (
Arc<Self>,
watch::Sender<()>, /* The exit sender */
JoinSet<()>, /* Handle to tasks */
) {
info!(
"Starting checkpoint service with {max_transactions_per_checkpoint} max_transactions_per_checkpoint and {max_checkpoint_size_bytes} max_checkpoint_size_bytes"
);
Expand All @@ -2249,6 +2254,8 @@ impl CheckpointService {

let (exit_snd, exit_rcv) = watch::channel(());

let mut tasks = JoinSet::new();

let builder = CheckpointBuilder::new(
state.clone(),
checkpoint_store.clone(),
Expand All @@ -2263,9 +2270,10 @@ impl CheckpointService {
max_transactions_per_checkpoint,
max_checkpoint_size_bytes,
);

let epoch_store_clone = epoch_store.clone();
spawn_monitored_task!(epoch_store_clone.within_alive_epoch(builder.run()));
tasks.spawn(monitored_future!(async move {
let _ = epoch_store_clone.within_alive_epoch(builder.run()).await;
}));

let aggregator = CheckpointAggregator::new(
checkpoint_store.clone(),
Expand All @@ -2276,8 +2284,7 @@ impl CheckpointService {
state.clone(),
metrics.clone(),
);

spawn_monitored_task!(aggregator.run());
tasks.spawn(monitored_future!(aggregator.run()));

let last_signature_index = epoch_store
.get_last_checkpoint_signature_index()
Expand All @@ -2291,7 +2298,8 @@ impl CheckpointService {
last_signature_index,
metrics,
});
(service, exit_snd)

(service, exit_snd, tasks)
}

#[cfg(test)]
Expand Down Expand Up @@ -2535,7 +2543,7 @@ mod tests {
&epoch_store,
));

let (checkpoint_service, _exit) = CheckpointService::spawn(
let (checkpoint_service, _exit_sender, _tasks) = CheckpointService::spawn(
state.clone(),
checkpoint_store,
epoch_store.clone(),
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-core/src/unit_tests/mysticeti_manager_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub fn checkpoint_service_for_testing(state: Arc<AuthorityState>) -> Arc<Checkpo
));
let (certified_output, _certified_result) = mpsc::channel::<CertifiedCheckpointSummary>(10);

let (checkpoint_service, _) = CheckpointService::spawn(
let (checkpoint_service, _, _) = CheckpointService::spawn(
state.clone(),
state.get_checkpoint_store().clone(),
epoch_store.clone(),
Expand Down
63 changes: 43 additions & 20 deletions crates/sui-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use arc_swap::ArcSwap;
use fastcrypto_zkp::bn254::zk_login::JwkId;
use fastcrypto_zkp::bn254::zk_login::OIDCProvider;
use futures::TryFutureExt;
use mysten_common::debug_fatal;
use prometheus::Registry;
use std::collections::{BTreeSet, HashMap, HashSet};
use std::fmt;
Expand Down Expand Up @@ -45,10 +46,9 @@ use sui_types::messages_consensus::AuthorityCapabilitiesV2;
use sui_types::sui_system_state::SuiSystemState;
use tap::tap::TapFallible;
use tokio::runtime::Handle;
use tokio::sync::broadcast;
use tokio::sync::mpsc;
use tokio::sync::{watch, Mutex};
use tokio::task::JoinHandle;
use tokio::sync::{broadcast, mpsc, watch, Mutex};
use tokio::task::{JoinHandle, JoinSet};
use tokio::time::timeout;
use tower::ServiceBuilder;
use tracing::{debug, error, warn};
use tracing::{error_span, info, Instrument};
Expand Down Expand Up @@ -150,10 +150,12 @@ pub struct ValidatorComponents {
consensus_manager: ConsensusManager,
consensus_store_pruner: ConsensusStorePruner,
consensus_adapter: Arc<ConsensusAdapter>,
// dropping this will eventually stop checkpoint tasks. The receiver side of this channel
// is copied into each checkpoint service task, and they are listening to any change to this
// channel. When the sender is dropped, a change is triggered and those tasks will exit.
// Sending to the channel or dropping this will eventually stop checkpoint tasks.
// The receiver side of this channel is copied into each checkpoint service task,
// and they are listening to any change to this channel.
checkpoint_service_exit: watch::Sender<()>,
// Keeping the handle to the checkpoint service tasks to shut them down during reconfiguration.
checkpoint_service_tasks: JoinSet<()>,
checkpoint_metrics: Arc<CheckpointMetrics>,
sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
}
Expand Down Expand Up @@ -1289,16 +1291,17 @@ impl SuiNode {
sui_node_metrics: Arc<SuiNodeMetrics>,
sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
) -> Result<ValidatorComponents> {
let (checkpoint_service, checkpoint_service_exit) = Self::start_checkpoint_service(
config,
consensus_adapter.clone(),
checkpoint_store,
epoch_store.clone(),
state.clone(),
state_sync_handle,
accumulator,
checkpoint_metrics.clone(),
);
let (checkpoint_service, checkpoint_service_exit, checkpoint_service_tasks) =
Self::start_checkpoint_service(
config,
consensus_adapter.clone(),
checkpoint_store,
epoch_store.clone(),
state.clone(),
state_sync_handle,
accumulator,
checkpoint_metrics.clone(),
);

// create a new map that gets injected into both the consensus handler and the consensus adapter
// the consensus handler will write values forwarded from consensus, and the consensus adapter
Expand Down Expand Up @@ -1376,6 +1379,7 @@ impl SuiNode {
consensus_store_pruner,
consensus_adapter,
checkpoint_service_exit,
checkpoint_service_tasks,
checkpoint_metrics,
sui_tx_validator_metrics,
})
Expand All @@ -1390,7 +1394,7 @@ impl SuiNode {
state_sync_handle: state_sync::Handle,
accumulator: Weak<StateAccumulator>,
checkpoint_metrics: Arc<CheckpointMetrics>,
) -> (Arc<CheckpointService>, watch::Sender<()>) {
) -> (Arc<CheckpointService>, watch::Sender<()>, JoinSet<()>) {
let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();

Expand Down Expand Up @@ -1685,15 +1689,33 @@ impl SuiNode {
consensus_store_pruner,
consensus_adapter,
checkpoint_service_exit,
mut checkpoint_service_tasks,
checkpoint_metrics,
sui_tx_validator_metrics,
}) = self.validator_components.lock().await.take()
{
info!("Reconfiguring the validator.");
// Stop the old checkpoint service.
drop(checkpoint_service_exit);
// Stop the old checkpoint service and wait for them to finish.
let _ = checkpoint_service_exit.send(());
let wait_result = timeout(Duration::from_secs(5), async move {
while let Some(result) = checkpoint_service_tasks.join_next().await {
if let Err(err) = result {
if err.is_panic() {
std::panic::resume_unwind(err.into_panic());
}
warn!("Error in checkpoint service task: {:?}", err);
}
}
})
.await;
if wait_result.is_err() {
debug_fatal!("Timed out waiting for checkpoint service tasks to finish.");
} else {
info!("Checkpoint service has shut down.");
}

consensus_manager.shutdown().await;
info!("Consensus has shut down.");

let new_epoch_store = self
.reconfigure_state(
Expand All @@ -1704,6 +1726,7 @@ impl SuiNode {
accumulator.clone(),
)
.await;
info!("Epoch store finished reconfiguration.");

// No other components should be holding a strong reference to state accumulator
// at this point. Confirm here before we swap in the new accumulator.
Expand Down

0 comments on commit 8d2bb84

Please sign in to comment.