Skip to content

Commit

Permalink
[Checkpoint] wait for checkpoint service to stop during epoch change
Browse files Browse the repository at this point in the history
  • Loading branch information
mwtian committed May 7, 2024
1 parent aa86996 commit 949b084
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 60 deletions.
63 changes: 12 additions & 51 deletions crates/sui-core/src/checkpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@ pub use crate::checkpoints::metrics::CheckpointMetrics;
use crate::stake_aggregator::{InsertResult, MultiStakeAggregator};
use crate::state_accumulator::StateAccumulator;
use diffy::create_patch;
use futures::future::{select, Either};
use futures::FutureExt;
use itertools::Itertools;
use mysten_metrics::{monitored_scope, spawn_monitored_task, MonitoredFutureExt};
use mysten_metrics::{monitored_scope, MonitoredFutureExt};
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use sui_macros::fail_point;
Expand All @@ -29,6 +27,7 @@ use sui_types::base_types::ConciseableName;
use sui_types::executable_transaction::VerifiedExecutableTransaction;
use sui_types::messages_checkpoint::CheckpointCommitment;
use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;
use tokio::task::JoinSet;

use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
use crate::consensus_handler::SequencedConsensusTransactionKey;
Expand Down Expand Up @@ -61,10 +60,7 @@ use sui_types::messages_consensus::ConsensusTransactionKey;
use sui_types::signature::GenericSignature;
use sui_types::sui_system_state::{SuiSystemState, SuiSystemStateTrait};
use sui_types::transaction::{TransactionDataAPI, TransactionKey, TransactionKind};
use tokio::{
sync::{watch, Notify},
time::timeout,
};
use tokio::{sync::Notify, time::timeout};
use tracing::{debug, error, info, instrument, warn};
use typed_store::traits::{TableSummary, TypedStoreDebug};
use typed_store::Map;
Expand Down Expand Up @@ -860,7 +856,6 @@ pub struct CheckpointBuilder {
effects_store: Arc<dyn EffectsNotifyRead>,
accumulator: Arc<StateAccumulator>,
output: Box<dyn CheckpointOutput>,
exit: watch::Receiver<()>,
metrics: Arc<CheckpointMetrics>,
max_transactions_per_checkpoint: usize,
max_checkpoint_size_bytes: usize,
Expand All @@ -870,7 +865,6 @@ pub struct CheckpointAggregator {
tables: Arc<CheckpointStore>,
epoch_store: Arc<AuthorityPerEpochStore>,
notify: Arc<Notify>,
exit: watch::Receiver<()>,
current: Option<CheckpointSignatureAggregator>,
output: Box<dyn CertifiedCheckpointOutput>,
state: Arc<AuthorityState>,
Expand Down Expand Up @@ -898,7 +892,6 @@ impl CheckpointBuilder {
effects_store: Arc<dyn EffectsNotifyRead>,
accumulator: Arc<StateAccumulator>,
output: Box<dyn CheckpointOutput>,
exit: watch::Receiver<()>,
notify_aggregator: Arc<Notify>,
metrics: Arc<CheckpointMetrics>,
max_transactions_per_checkpoint: usize,
Expand All @@ -912,25 +905,16 @@ impl CheckpointBuilder {
effects_store,
accumulator,
output,
exit,
notify_aggregator,
metrics,
max_transactions_per_checkpoint,
max_checkpoint_size_bytes,
}
}

async fn run(mut self) {
async fn run(self) {
info!("Starting CheckpointBuilder");
'main: loop {
// Check whether an exit signal has been received, if so we break the loop.
// This gives us a chance to exit, in case checkpoint making keeps failing.
match self.exit.has_changed() {
Ok(true) | Err(_) => {
break;
}
Ok(false) => (),
};
let mut last = self
.epoch_store
.last_built_checkpoint_commit_height()
Expand All @@ -953,15 +937,8 @@ impl CheckpointBuilder {
}
}
debug!("Waiting for more checkpoints from consensus after processing {last:?}");
match select(self.exit.changed().boxed(), self.notify.notified().boxed()).await {
Either::Left(_) => {
// break loop on exit signal
break;
}
Either::Right(_) => {}
}
self.notify.notified().await;
}
info!("Shutting down CheckpointBuilder");
}

#[instrument(level = "debug", skip_all, fields(?height))]
Expand Down Expand Up @@ -1463,7 +1440,6 @@ impl CheckpointAggregator {
tables: Arc<CheckpointStore>,
epoch_store: Arc<AuthorityPerEpochStore>,
notify: Arc<Notify>,
exit: watch::Receiver<()>,
output: Box<dyn CertifiedCheckpointOutput>,
state: Arc<AuthorityState>,
metrics: Arc<CheckpointMetrics>,
Expand All @@ -1473,7 +1449,6 @@ impl CheckpointAggregator {
tables,
epoch_store,
notify,
exit,
current,
output,
state,
Expand All @@ -1494,19 +1469,7 @@ impl CheckpointAggregator {
continue;
}

match select(
self.exit.changed().boxed(),
timeout(Duration::from_secs(1), self.notify.notified()).boxed(),
)
.await
{
Either::Left(_) => {
// return on exit signal
info!("Shutting down CheckpointAggregator");
return;
}
Either::Right(_) => {}
}
let _ = timeout(Duration::from_secs(1), self.notify.notified()).await;
}
}

Expand Down Expand Up @@ -1927,14 +1890,14 @@ impl CheckpointService {
metrics: Arc<CheckpointMetrics>,
max_transactions_per_checkpoint: usize,
max_checkpoint_size_bytes: usize,
) -> (Arc<Self>, watch::Sender<()> /* The exit sender */) {
) -> (Arc<Self>, JoinSet<()> /* Handle to tasks */) {
info!(
"Starting checkpoint service with {max_transactions_per_checkpoint} max_transactions_per_checkpoint and {max_checkpoint_size_bytes} max_checkpoint_size_bytes"
);
let notify_builder = Arc::new(Notify::new());
let notify_aggregator = Arc::new(Notify::new());

let (exit_snd, exit_rcv) = watch::channel(());
let mut tasks = JoinSet::new();

let builder = CheckpointBuilder::new(
state.clone(),
Expand All @@ -1944,26 +1907,24 @@ impl CheckpointService {
effects_store,
accumulator,
checkpoint_output,
exit_rcv.clone(),
notify_aggregator.clone(),
metrics.clone(),
max_transactions_per_checkpoint,
max_checkpoint_size_bytes,
);

spawn_monitored_task!(builder.run());
tasks.spawn(builder.run());

let aggregator = CheckpointAggregator::new(
checkpoint_store.clone(),
epoch_store.clone(),
notify_aggregator.clone(),
exit_rcv,
certified_checkpoint_output,
state.clone(),
metrics.clone(),
);

spawn_monitored_task!(aggregator.run());
tasks.spawn(aggregator.run());

let last_signature_index = epoch_store
.get_last_checkpoint_signature_index()
Expand All @@ -1977,7 +1938,7 @@ impl CheckpointService {
last_signature_index,
metrics,
});
(service, exit_snd)
(service, tasks)
}

#[cfg(test)]
Expand Down Expand Up @@ -2204,7 +2165,7 @@ mod tests {
let accumulator = StateAccumulator::new(state.get_accumulator_store().clone());

let epoch_store = state.epoch_store_for_testing();
let (checkpoint_service, _exit) = CheckpointService::spawn(
let (checkpoint_service, _tasks) = CheckpointService::spawn(
state.clone(),
checkpoint_store,
epoch_store.clone(),
Expand Down
17 changes: 8 additions & 9 deletions crates/sui-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use tokio::sync::broadcast;
use tokio::sync::mpsc;
use tokio::sync::{watch, Mutex};
use tokio::task::JoinHandle;
use tokio::task::JoinSet;
use tower::ServiceBuilder;
use tracing::{debug, error, warn};
use tracing::{error_span, info, Instrument};
Expand Down Expand Up @@ -142,10 +143,8 @@ pub struct ValidatorComponents {
consensus_manager: ConsensusManager,
consensus_epoch_data_remover: EpochDataRemover,
consensus_adapter: Arc<ConsensusAdapter>,
// dropping this will eventually stop checkpoint tasks. The receiver side of this channel
// is copied into each checkpoint service task, and they are listening to any change to this
// channel. When the sender is dropped, a change is triggered and those tasks will exit.
checkpoint_service_exit: watch::Sender<()>,
// Keeping the handle to the checkpoint service tasks to shut them down during reconfiguration.
checkpoint_service_tasks: JoinSet<()>,
checkpoint_metrics: Arc<CheckpointMetrics>,
sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
}
Expand Down Expand Up @@ -1178,7 +1177,7 @@ impl SuiNode {
sui_node_metrics: Arc<SuiNodeMetrics>,
sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
) -> Result<ValidatorComponents> {
let (checkpoint_service, checkpoint_service_exit) = Self::start_checkpoint_service(
let (checkpoint_service, checkpoint_service_tasks) = Self::start_checkpoint_service(
config,
consensus_adapter.clone(),
checkpoint_store,
Expand Down Expand Up @@ -1262,7 +1261,7 @@ impl SuiNode {
consensus_manager,
consensus_epoch_data_remover,
consensus_adapter,
checkpoint_service_exit,
checkpoint_service_tasks,
checkpoint_metrics,
sui_tx_validator_metrics,
})
Expand All @@ -1277,7 +1276,7 @@ impl SuiNode {
state_sync_handle: state_sync::Handle,
accumulator: Arc<StateAccumulator>,
checkpoint_metrics: Arc<CheckpointMetrics>,
) -> (Arc<CheckpointService>, watch::Sender<()>) {
) -> (Arc<CheckpointService>, JoinSet<()>) {
let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();

Expand Down Expand Up @@ -1541,14 +1540,14 @@ impl SuiNode {
consensus_manager,
consensus_epoch_data_remover,
consensus_adapter,
checkpoint_service_exit,
mut checkpoint_service_tasks,
checkpoint_metrics,
sui_tx_validator_metrics,
}) = self.validator_components.lock().await.take()
{
info!("Reconfiguring the validator.");
// Stop the old checkpoint service.
drop(checkpoint_service_exit);
checkpoint_service_tasks.shutdown().await;

consensus_manager.shutdown().await;

Expand Down

0 comments on commit 949b084

Please sign in to comment.