From 6e57a80e041deceec6ce0960765f60ce1d95333f Mon Sep 17 00:00:00 2001 From: matan-starkware <97523054+matan-starkware@users.noreply.github.com> Date: Mon, 6 Jan 2025 19:33:32 +0200 Subject: [PATCH] fix(sequencing): cende panics if late (#3117) Cende source was written to send a success signal over a channel and panic on failure. Cende usage is meant to check once for completion and drop the receiver if incomplete. This leads to panic if cende finishes late and sends after the receiver is dropped. Also use AbortOnDropHandle to shut down the cende task once we no longer wait for it. --- Cargo.lock | 2 ++ .../papyrus_consensus_orchestrator/Cargo.toml | 2 +- .../src/cende/mod.rs | 29 ++++++------------- .../src/sequencer_consensus_context.rs | 28 +++++++++++------- .../src/sequencer_consensus_context_test.rs | 23 ++++++--------- 5 files changed, 38 insertions(+), 46 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 33be2839c6..f3f611fb9a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11430,6 +11430,8 @@ dependencies = [ "futures-core", "futures-io", "futures-sink", + "futures-util", + "hashbrown 0.14.5", "pin-project-lite", "tokio", ] diff --git a/crates/sequencing/papyrus_consensus_orchestrator/Cargo.toml b/crates/sequencing/papyrus_consensus_orchestrator/Cargo.toml index 1a1f548ec2..a9e4a2ee2d 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/Cargo.toml +++ b/crates/sequencing/papyrus_consensus_orchestrator/Cargo.toml @@ -24,7 +24,7 @@ starknet_api.workspace = true starknet_batcher_types = { workspace = true, features = ["testing"] } starknet_state_sync_types = { workspace = true, features = ["testing"] } tokio = { workspace = true, features = ["full"] } -tokio-util.workspace = true +tokio-util = { workspace = true, features = ["rt"] } tracing.workspace = true url = { workspace = true, features = ["serde"] } diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/cende/mod.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/cende/mod.rs index c77c2f79b4..f59e0dccac 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/cende/mod.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/cende/mod.rs @@ -4,7 +4,6 @@ use std::collections::BTreeMap; use std::sync::Arc; use async_trait::async_trait; -use futures::channel::oneshot; #[cfg(test)] use mockall::automock; use papyrus_config::dumping::{ser_param, SerializeConfig}; @@ -13,7 +12,7 @@ use reqwest::Client; use serde::{Deserialize, Serialize}; use starknet_api::block::BlockNumber; use tokio::sync::Mutex; -use tokio::task::{self}; +use tokio::task::{self, JoinHandle}; use tracing::debug; use url::Url; @@ -31,7 +30,7 @@ pub trait CendeContext: Send + Sync { /// Write the previous height blob to Aerospike. Returns a cell with an inner boolean indicating /// whether the write was successful. /// `height` is the height of the block that is built when calling this function. - fn write_prev_height_blob(&self, height: BlockNumber) -> oneshot::Receiver; + fn write_prev_height_blob(&self, height: BlockNumber) -> JoinHandle; // Prepares the previous height blob that will be written in the next height. async fn prepare_blob_for_next_height(&self, blob_parameters: BlobParameters); @@ -94,8 +93,7 @@ impl SerializeConfig for CendeConfig { #[async_trait] impl CendeContext for CendeAmbassador { - fn write_prev_height_blob(&self, height: BlockNumber) -> oneshot::Receiver { - let (sender, receiver) = oneshot::channel(); + fn write_prev_height_blob(&self, height: BlockNumber) -> JoinHandle { let prev_height_blob = self.prev_height_blob.clone(); let request_builder = self.client.post(self.url.clone()); task::spawn(async move { @@ -110,15 +108,13 @@ impl CendeContext for CendeAmbassador { writing to Aerospike", height ); - oneshot_send(sender, true); - return; + return true; } let Some(ref blob) = *prev_height_blob.lock().await else { // This case happens when restarting the node, `prev_height_blob` intial value is // `None`. debug!("No blob to write to Aerospike."); - oneshot_send(sender, false); - return; + return false; }; // TODO(dvir): consider set `prev_height_blob` to `None` after writing to AS. debug!("Writing blob to Aerospike."); @@ -126,27 +122,20 @@ impl CendeContext for CendeAmbassador { Ok(response) => { if response.status().is_success() { debug!("Blob written to Aerospike successfully."); - oneshot_send(sender, true); + true } else { debug!("The recorder failed to write blob. Error: {}", response.status()); - oneshot_send(sender, false); + false } } Err(err) => { debug!("Failed to send a request to the recorder. Error: {}", err); // TODO(dvir): change this to `false`. The reason for the current value is to // make the `end_to_end_flow_test` to pass. - oneshot_send(sender, true); + true } } - }); - - return receiver; - - // Helper function to send a boolean result to a one-shot sender. - fn oneshot_send(sender: oneshot::Sender, result: bool) { - sender.send(result).expect("Writing to a one-shot sender should succeed."); - } + }) } async fn prepare_blob_for_next_height(&self, blob_parameters: BlobParameters) { diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs index c52ae83682..a831bb3c37 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs @@ -11,7 +11,7 @@ use std::time::Duration; use async_trait::async_trait; use futures::channel::{mpsc, oneshot}; -use futures::{SinkExt, StreamExt}; +use futures::{FutureExt, SinkExt, StreamExt}; use papyrus_consensus::types::{ ConsensusContext, ConsensusError, @@ -62,6 +62,7 @@ use starknet_state_sync_types::communication::SharedStateSyncClient; use starknet_state_sync_types::state_sync_types::SyncBlock; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; +use tokio_util::task::AbortOnDropHandle; use tracing::{debug, debug_span, info, instrument, trace, warn, Instrument}; use crate::cende::{BlobParameters, CendeContext}; @@ -189,8 +190,9 @@ impl ConsensusContext for SequencerConsensusContext { timeout: Duration, ) -> oneshot::Receiver { info!("Building proposal: timeout={timeout:?}"); - let cende_write_success = - self.cende_ambassador.write_prev_height_blob(proposal_init.height); + let cende_write_success = AbortOnDropHandle::new( + self.cende_ambassador.write_prev_height_blob(proposal_init.height), + ); // Handles interrupting an active proposal from a previous height/round self.set_height_and_round(proposal_init.height, proposal_init.round).await; @@ -478,7 +480,7 @@ async fn build_proposal( batcher: Arc, valid_proposals: Arc>, proposal_id: ProposalId, - cende_write_success: oneshot::Receiver, + cende_write_success: AbortOnDropHandle, gas_prices: GasPrices, ) { initialize_build(proposal_id, &proposal_init, timeout, batcher.as_ref(), gas_prices).await; @@ -558,7 +560,7 @@ async fn get_proposal_content( proposal_id: ProposalId, batcher: &dyn BatcherClient, mut proposal_sender: mpsc::Sender, - mut cende_write_success: oneshot::Receiver, + cende_write_success: AbortOnDropHandle, ) -> Option<(ProposalContentId, Vec)> { let mut content = Vec::new(); loop { @@ -603,16 +605,20 @@ async fn get_proposal_content( // If the blob writing operation to Aerospike doesn't return a success status, we // can't finish the proposal. - match cende_write_success.try_recv() { - Ok(Some(true)) => { + match cende_write_success.now_or_never() { + Some(Ok(true)) => { debug!("Writing blob to Aerospike completed."); } - Ok(Some(false)) => { - debug!("Writing blob to Aerospike failed."); + Some(Ok(false)) => { + warn!("Writing blob to Aerospike failed."); + return None; + } + Some(Err(e)) => { + warn!("Writing blob to Aerospike failed. Error: {e:?}"); return None; } - _ => { - debug!("Writing blob to Aerospike didn't return in time."); + None => { + warn!("Writing blob to Aerospike didn't return in time."); return None; } } diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs index 78a034338d..36b75d4be2 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context_test.rs @@ -1,8 +1,10 @@ +use std::future::ready; use std::sync::{Arc, OnceLock}; use std::time::Duration; use std::vec; use futures::channel::{mpsc, oneshot}; +use futures::future::pending; use futures::{FutureExt, SinkExt}; use lazy_static::lazy_static; use papyrus_consensus::stream_handler::StreamHandler; @@ -150,12 +152,7 @@ async fn build_proposal_setup( // Returns a mock CendeContext that will return a successful write_prev_height_blob. fn success_cende_ammbassador() -> MockCendeContext { let mut mock_cende = MockCendeContext::new(); - mock_cende.expect_write_prev_height_blob().returning(|_height| { - let (sender, receiver) = oneshot::channel(); - sender.send(true).unwrap(); - receiver - }); - + mock_cende.expect_write_prev_height_blob().return_once(|_height| tokio::spawn(ready(true))); mock_cende } @@ -445,11 +442,9 @@ async fn build_proposal() { #[tokio::test] async fn build_proposal_cende_failure() { let mut mock_cende_context = MockCendeContext::new(); - mock_cende_context.expect_write_prev_height_blob().times(1).returning(|_height| { - let (sender, receiver) = oneshot::channel(); - sender.send(false).unwrap(); - receiver - }); + mock_cende_context + .expect_write_prev_height_blob() + .return_once(|_height| tokio::spawn(ready(false))); let (fin_receiver, _network) = build_proposal_setup(mock_cende_context).await; @@ -459,11 +454,11 @@ async fn build_proposal_cende_failure() { #[tokio::test] async fn build_proposal_cende_incomplete() { let mut mock_cende_context = MockCendeContext::new(); - let (sender, receiver) = oneshot::channel(); - mock_cende_context.expect_write_prev_height_blob().times(1).return_once(|_height| receiver); + mock_cende_context + .expect_write_prev_height_blob() + .return_once(|_height| tokio::spawn(pending())); let (fin_receiver, _network) = build_proposal_setup(mock_cende_context).await; assert_eq!(fin_receiver.await, Err(oneshot::Canceled)); - drop(sender); }