Skip to content

Commit

Permalink
WIP even more synchronization
Browse files Browse the repository at this point in the history
  • Loading branch information
mystenmark committed Oct 15, 2024
1 parent 62e731c commit 904d92d
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 8 deletions.
19 changes: 14 additions & 5 deletions crates/sui-core/src/checkpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -819,9 +819,12 @@ impl CheckpointBuilder {
}
}

async fn run(mut self, startup_wait: tokio::sync::oneshot::Receiver<()>) {
async fn run(
mut self,
startup_wait: tokio::sync::oneshot::Receiver<tokio::sync::oneshot::Sender<()>>,
) {
info!("CheckpointBuilder waiting for startup signal");
startup_wait.await.ok();
let mut notify = Some(startup_wait.await.unwrap());
info!("Starting CheckpointBuilder");
loop {
// Check whether an exit signal has been received, if so we break the loop.
Expand All @@ -835,6 +838,10 @@ impl CheckpointBuilder {

self.maybe_build_checkpoints().await;

if let Some(notify) = notify.take() {
notify.send(()).unwrap();
}

match select(self.exit.changed().boxed(), self.notify.notified().boxed()).await {
Either::Left(_) => {
// break loop on exit signal
Expand Down Expand Up @@ -2146,8 +2153,8 @@ impl CheckpointService {
max_checkpoint_size_bytes: usize,
) -> (
Arc<Self>,
watch::Sender<()>, /* The exit sender */
tokio::sync::oneshot::Sender<()>, /* builder start-up sender */
watch::Sender<()>, /* The exit sender */
tokio::sync::oneshot::Sender<tokio::sync::oneshot::Sender<()>>, /* builder start-up sender */
) {
info!(
"Starting checkpoint service with {max_transactions_per_checkpoint} max_transactions_per_checkpoint and {max_checkpoint_size_bytes} max_checkpoint_size_bytes"
Expand Down Expand Up @@ -2456,7 +2463,9 @@ mod tests {
3,
100_000,
);
startup.send(()).unwrap();
let (tx, rx) = tokio::sync::oneshot::channel();
startup.send(tx).unwrap();
rx.await.unwrap();

checkpoint_service
.write_and_notify_checkpoint_for_testing(&epoch_store, p(0, vec![4], 0))
Expand Down
6 changes: 5 additions & 1 deletion crates/sui-core/src/unit_tests/mysticeti_manager_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@ pub fn checkpoint_service_for_testing(state: Arc<AuthorityState>) -> Arc<Checkpo
3,
100_000,
);
startup.send(()).unwrap();
let (tx, rx) = tokio::sync::oneshot::channel();
startup.send(tx).unwrap();
tokio::task::spawn(async move {
rx.await.unwrap();
});
checkpoint_service
}

Expand Down
7 changes: 5 additions & 2 deletions crates/sui-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1333,10 +1333,13 @@ impl SuiNode {
.await;

info!("consensus manager started");
let (tx, rx) = tokio::sync::oneshot::channel();
startup_sender
.send(())
.send(tx)
.expect("Failed to send startup signal");

rx.await.expect("Failed to receive startup signal");

if epoch_store.authenticator_state_enabled() {
Self::start_jwk_updater(
config,
Expand Down Expand Up @@ -1371,7 +1374,7 @@ impl SuiNode {
) -> (
Arc<CheckpointService>,
watch::Sender<()>,
tokio::sync::oneshot::Sender<()>,
tokio::sync::oneshot::Sender<tokio::sync::oneshot::Sender<()>>,
) {
let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms();
let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms();
Expand Down

0 comments on commit 904d92d

Please sign in to comment.