Skip to content

Commit

Permalink
fix(sequencing): cende panics if late (#3117)
Browse files Browse the repository at this point in the history
Cende source was written to send a success signal over a channel and panic on failure.
Cende usage is meant to check once for completion and drop the receiver if incomplete.
This leads to panic if cende finishes late and sends after the receiver is dropped.

Also use AbortOnDropHandle to shut down the cende task once we no longer wait for it.
  • Loading branch information
matan-starkware authored Jan 6, 2025
1 parent 5841764 commit 6e57a80
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 46 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ starknet_api.workspace = true
starknet_batcher_types = { workspace = true, features = ["testing"] }
starknet_state_sync_types = { workspace = true, features = ["testing"] }
tokio = { workspace = true, features = ["full"] }
tokio-util.workspace = true
tokio-util = { workspace = true, features = ["rt"] }
tracing.workspace = true
url = { workspace = true, features = ["serde"] }

Expand Down
29 changes: 9 additions & 20 deletions crates/sequencing/papyrus_consensus_orchestrator/src/cende/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use std::collections::BTreeMap;
use std::sync::Arc;

use async_trait::async_trait;
use futures::channel::oneshot;
#[cfg(test)]
use mockall::automock;
use papyrus_config::dumping::{ser_param, SerializeConfig};
Expand All @@ -13,7 +12,7 @@ use reqwest::Client;
use serde::{Deserialize, Serialize};
use starknet_api::block::BlockNumber;
use tokio::sync::Mutex;
use tokio::task::{self};
use tokio::task::{self, JoinHandle};
use tracing::debug;
use url::Url;

Expand All @@ -31,7 +30,7 @@ pub trait CendeContext: Send + Sync {
/// Write the previous height blob to Aerospike. Returns a cell with an inner boolean indicating
/// whether the write was successful.
/// `height` is the height of the block that is built when calling this function.
fn write_prev_height_blob(&self, height: BlockNumber) -> oneshot::Receiver<bool>;
fn write_prev_height_blob(&self, height: BlockNumber) -> JoinHandle<bool>;

// Prepares the previous height blob that will be written in the next height.
async fn prepare_blob_for_next_height(&self, blob_parameters: BlobParameters);
Expand Down Expand Up @@ -94,8 +93,7 @@ impl SerializeConfig for CendeConfig {

#[async_trait]
impl CendeContext for CendeAmbassador {
fn write_prev_height_blob(&self, height: BlockNumber) -> oneshot::Receiver<bool> {
let (sender, receiver) = oneshot::channel();
fn write_prev_height_blob(&self, height: BlockNumber) -> JoinHandle<bool> {
let prev_height_blob = self.prev_height_blob.clone();
let request_builder = self.client.post(self.url.clone());
task::spawn(async move {
Expand All @@ -110,43 +108,34 @@ impl CendeContext for CendeAmbassador {
writing to Aerospike",
height
);
oneshot_send(sender, true);
return;
return true;
}
let Some(ref blob) = *prev_height_blob.lock().await else {
// This case happens when restarting the node, `prev_height_blob` intial value is
// `None`.
debug!("No blob to write to Aerospike.");
oneshot_send(sender, false);
return;
return false;
};
// TODO(dvir): consider set `prev_height_blob` to `None` after writing to AS.
debug!("Writing blob to Aerospike.");
match request_builder.json(blob).send().await {
Ok(response) => {
if response.status().is_success() {
debug!("Blob written to Aerospike successfully.");
oneshot_send(sender, true);
true
} else {
debug!("The recorder failed to write blob. Error: {}", response.status());
oneshot_send(sender, false);
false
}
}
Err(err) => {
debug!("Failed to send a request to the recorder. Error: {}", err);
// TODO(dvir): change this to `false`. The reason for the current value is to
// make the `end_to_end_flow_test` to pass.
oneshot_send(sender, true);
true
}
}
});

return receiver;

// Helper function to send a boolean result to a one-shot sender.
fn oneshot_send(sender: oneshot::Sender<bool>, result: bool) {
sender.send(result).expect("Writing to a one-shot sender should succeed.");
}
})
}

async fn prepare_blob_for_next_height(&self, blob_parameters: BlobParameters) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::time::Duration;

use async_trait::async_trait;
use futures::channel::{mpsc, oneshot};
use futures::{SinkExt, StreamExt};
use futures::{FutureExt, SinkExt, StreamExt};
use papyrus_consensus::types::{
ConsensusContext,
ConsensusError,
Expand Down Expand Up @@ -62,6 +62,7 @@ use starknet_state_sync_types::communication::SharedStateSyncClient;
use starknet_state_sync_types::state_sync_types::SyncBlock;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tokio_util::task::AbortOnDropHandle;
use tracing::{debug, debug_span, info, instrument, trace, warn, Instrument};

use crate::cende::{BlobParameters, CendeContext};
Expand Down Expand Up @@ -189,8 +190,9 @@ impl ConsensusContext for SequencerConsensusContext {
timeout: Duration,
) -> oneshot::Receiver<ProposalContentId> {
info!("Building proposal: timeout={timeout:?}");
let cende_write_success =
self.cende_ambassador.write_prev_height_blob(proposal_init.height);
let cende_write_success = AbortOnDropHandle::new(
self.cende_ambassador.write_prev_height_blob(proposal_init.height),
);
// Handles interrupting an active proposal from a previous height/round
self.set_height_and_round(proposal_init.height, proposal_init.round).await;

Expand Down Expand Up @@ -478,7 +480,7 @@ async fn build_proposal(
batcher: Arc<dyn BatcherClient>,
valid_proposals: Arc<Mutex<HeightToIdToContent>>,
proposal_id: ProposalId,
cende_write_success: oneshot::Receiver<bool>,
cende_write_success: AbortOnDropHandle<bool>,
gas_prices: GasPrices,
) {
initialize_build(proposal_id, &proposal_init, timeout, batcher.as_ref(), gas_prices).await;
Expand Down Expand Up @@ -558,7 +560,7 @@ async fn get_proposal_content(
proposal_id: ProposalId,
batcher: &dyn BatcherClient,
mut proposal_sender: mpsc::Sender<ProposalPart>,
mut cende_write_success: oneshot::Receiver<bool>,
cende_write_success: AbortOnDropHandle<bool>,
) -> Option<(ProposalContentId, Vec<ExecutableTransaction>)> {
let mut content = Vec::new();
loop {
Expand Down Expand Up @@ -603,16 +605,20 @@ async fn get_proposal_content(

// If the blob writing operation to Aerospike doesn't return a success status, we
// can't finish the proposal.
match cende_write_success.try_recv() {
Ok(Some(true)) => {
match cende_write_success.now_or_never() {
Some(Ok(true)) => {
debug!("Writing blob to Aerospike completed.");
}
Ok(Some(false)) => {
debug!("Writing blob to Aerospike failed.");
Some(Ok(false)) => {
warn!("Writing blob to Aerospike failed.");
return None;
}
Some(Err(e)) => {
warn!("Writing blob to Aerospike failed. Error: {e:?}");
return None;
}
_ => {
debug!("Writing blob to Aerospike didn't return in time.");
None => {
warn!("Writing blob to Aerospike didn't return in time.");
return None;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use std::future::ready;
use std::sync::{Arc, OnceLock};
use std::time::Duration;
use std::vec;

use futures::channel::{mpsc, oneshot};
use futures::future::pending;
use futures::{FutureExt, SinkExt};
use lazy_static::lazy_static;
use papyrus_consensus::stream_handler::StreamHandler;
Expand Down Expand Up @@ -150,12 +152,7 @@ async fn build_proposal_setup(
// Returns a mock CendeContext that will return a successful write_prev_height_blob.
fn success_cende_ammbassador() -> MockCendeContext {
let mut mock_cende = MockCendeContext::new();
mock_cende.expect_write_prev_height_blob().returning(|_height| {
let (sender, receiver) = oneshot::channel();
sender.send(true).unwrap();
receiver
});

mock_cende.expect_write_prev_height_blob().return_once(|_height| tokio::spawn(ready(true)));
mock_cende
}

Expand Down Expand Up @@ -445,11 +442,9 @@ async fn build_proposal() {
#[tokio::test]
async fn build_proposal_cende_failure() {
let mut mock_cende_context = MockCendeContext::new();
mock_cende_context.expect_write_prev_height_blob().times(1).returning(|_height| {
let (sender, receiver) = oneshot::channel();
sender.send(false).unwrap();
receiver
});
mock_cende_context
.expect_write_prev_height_blob()
.return_once(|_height| tokio::spawn(ready(false)));

let (fin_receiver, _network) = build_proposal_setup(mock_cende_context).await;

Expand All @@ -459,11 +454,11 @@ async fn build_proposal_cende_failure() {
#[tokio::test]
async fn build_proposal_cende_incomplete() {
let mut mock_cende_context = MockCendeContext::new();
let (sender, receiver) = oneshot::channel();
mock_cende_context.expect_write_prev_height_blob().times(1).return_once(|_height| receiver);
mock_cende_context
.expect_write_prev_height_blob()
.return_once(|_height| tokio::spawn(pending()));

let (fin_receiver, _network) = build_proposal_setup(mock_cende_context).await;

assert_eq!(fin_receiver.await, Err(oneshot::Canceled));
drop(sender);
}

0 comments on commit 6e57a80

Please sign in to comment.