Skip to content

Commit

Permalink
refactor(sequencing): cende context, add logic and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
DvirYo-starkware committed Jan 5, 2025
1 parent 71c1a30 commit 49ca25e
Show file tree
Hide file tree
Showing 9 changed files with 172 additions and 55 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ blockifier.workspace = true
infra_utils.workspace = true
lazy_static.workspace = true
mockall.workspace = true
mockito.workspace = true
papyrus_network = { workspace = true, features = ["testing"] }
papyrus_storage = { workspace = true, features = ["testing"] }
papyrus_test_utils.workspace = true
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use rstest::rstest;
use starknet_api::block::BlockNumber;

use super::{CendeAmbassador, RECORDER_WRITE_BLOB_PATH};
use crate::cende::{BlobParameters, CendeConfig, CendeContext};

const HEIGHT_TO_WRITE: u64 = 10;

#[rstest]
#[case::success(200, Some(9), 1, true)]
#[case::no_prev_block(200, None, 0, false)]
#[case::prev_block_height_mismatch(200, Some(7), 0, false)]
#[case::recorder_return_error(500, Some(9), 1, false)]
#[tokio::test]
async fn write_prev_height_blob(
#[case] mock_status_code: usize,
#[case] prev_block: Option<u64>,
#[case] expected_calls: usize,
#[case] expected_result: bool,
) {
let mut server = mockito::Server::new_async().await;
let url = server.url();
let mock = server.mock("POST", RECORDER_WRITE_BLOB_PATH).with_status(mock_status_code).create();

let cende_ambassador = CendeAmbassador::new(CendeConfig { recorder_url: url.parse().unwrap() });

if let Some(prev_block) = prev_block {
cende_ambassador.prepare_blob_for_next_height(BlobParameters { height: prev_block }).await;
}

let receiver = cende_ambassador.write_prev_height_blob(BlockNumber(HEIGHT_TO_WRITE));

assert_eq!(receiver.await.unwrap(), expected_result);
mock.expect(expected_calls).assert();
}

#[tokio::test]
async fn prepare_blob_for_next_height() {
let cende_ambassador =
CendeAmbassador::new(CendeConfig { recorder_url: "http://parsable_url".parse().unwrap() });

cende_ambassador.prepare_blob_for_next_height(BlobParameters { height: HEIGHT_TO_WRITE }).await;

assert_eq!(
cende_ambassador.prev_height_blob.lock().await.as_ref().unwrap().block_number.0,
HEIGHT_TO_WRITE
);
}
137 changes: 87 additions & 50 deletions crates/sequencing/papyrus_consensus_orchestrator/src/cende/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#[cfg(test)]
mod cende_test;
mod central_objects;

use std::collections::BTreeMap;
Expand All @@ -9,29 +11,28 @@ use futures::channel::oneshot;
use mockall::automock;
use papyrus_config::dumping::{ser_param, SerializeConfig};
use papyrus_config::{ParamPath, ParamPrivacyInput, SerializedParam};
use reqwest::Client;
use reqwest::{Client, RequestBuilder};
use serde::{Deserialize, Serialize};
use starknet_api::block::BlockNumber;
use tokio::sync::Mutex;
use tokio::task::{self};
use tracing::debug;
use url::Url;

// TODO(dvir): add tests when will have more logic.

/// A chunk of all the data to write to Aersopike.
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct AerospikeBlob {
// TODO(yael, dvir): add the blob fields.
block_number: BlockNumber,
}

#[cfg_attr(test, automock)]
#[async_trait]
pub trait CendeContext: Send + Sync {
/// Write the previous height blob to Aerospike. Returns a cell with an inner boolean indicating
/// whether the write was successful.
/// `height` is the height of the block that is built when calling this function.
fn write_prev_height_blob(&self, height: BlockNumber) -> oneshot::Receiver<bool>;
/// `current_height` is the height of the block that is built when calling this function.
fn write_prev_height_blob(&self, current_height: BlockNumber) -> oneshot::Receiver<bool>;

// Prepares the previous height blob that will be written in the next height.
async fn prepare_blob_for_next_height(&self, blob_parameters: BlobParameters);
Expand All @@ -48,7 +49,7 @@ pub struct CendeAmbassador {
}

/// The path to write blob in the Recorder.
const RECORDER_WRITE_BLOB_PATH: &str = "/write_blob";
pub const RECORDER_WRITE_BLOB_PATH: &str = "/cende/write_blob";

impl CendeAmbassador {
pub fn new(cende_config: CendeConfig) -> Self {
Expand Down Expand Up @@ -94,59 +95,55 @@ impl SerializeConfig for CendeConfig {

#[async_trait]
impl CendeContext for CendeAmbassador {
fn write_prev_height_blob(&self, height: BlockNumber) -> oneshot::Receiver<bool> {
fn write_prev_height_blob(&self, current_height: BlockNumber) -> oneshot::Receiver<bool> {
let (sender, receiver) = oneshot::channel();
let prev_height_blob = self.prev_height_blob.clone();
let request_builder = self.client.post(self.url.clone());
task::spawn(async move {
// TODO(dvir): remove this when handle the booting up case.
// Heights that are permitted to be built without writing to Aerospike.
// Height 1 to make `end_to_end_flow` test pass.
const SKIP_WRITE_HEIGHTS: [BlockNumber; 1] = [BlockNumber(1)];

if SKIP_WRITE_HEIGHTS.contains(&height) {
debug!(
"height {} is in `SKIP_WRITE_HEIGHTS`, consensus can send proposal without \
writing to Aerospike",
height
);
oneshot_send(sender, true);
return;
}
let Some(ref blob) = *prev_height_blob.lock().await else {
// TODO(dvir): remove this when handle the booting up case.
// Heights that are permitted to be built without writing to Aerospike.
// Height 1 to make `end_to_end_flow` test pass.
const SKIP_WRITE_HEIGHTS: [BlockNumber; 1] = [BlockNumber(1)];

if SKIP_WRITE_HEIGHTS.contains(&current_height) {
debug!(
"height {} is in `SKIP_WRITE_HEIGHTS`, consensus can send proposal without \
writing to Aerospike",
current_height
);
oneshot_send(sender, true);
return receiver;
}

task::spawn(async move {
// TODO(dvir): consider extracting the "should write blob" logic to a function.
let Some(ref blob): Option<AerospikeBlob> = *prev_height_blob.lock().await else {
// This case happens when restarting the node, `prev_height_blob` intial value is
// `None`.
debug!("No blob to write to Aerospike.");
oneshot_send(sender, false);
return;
};
// TODO(dvir): consider set `prev_height_blob` to `None` after writing to AS.
debug!("Writing blob to Aerospike.");
match request_builder.json(blob).send().await {
Ok(response) => {
if response.status().is_success() {
debug!("Blob written to Aerospike successfully.");
oneshot_send(sender, true);
} else {
debug!("The recorder failed to write blob. Error: {}", response.status());
oneshot_send(sender, false);
}
}
Err(err) => {
debug!("Failed to send a request to the recorder. Error: {}", err);
// TODO(dvir): change this to `false`. The reason for the current value is to
// make the `end_to_end_flow_test` to pass.
oneshot_send(sender, true);
}

// Can happen in case the consensus got a block from the state sync and due to that did
// not update the cende ambassador in `decision_reached` function.
// TODO(dvir): what to do in the case of the `blob.block_number.0 >= height.0`? this
// means a bug.
if blob.block_number.0 + 1 != current_height.0 {
debug!(
"Mismatch blob block number and height, can't write blob to Aerospike. Blob \
block number {}, height {}",
blob.block_number, current_height
);
oneshot_send(sender, false);
return;
}
});

return receiver;
debug!("Writing blob to Aerospike.");
send_write_blob(request_builder, blob, sender).await;
});

// Helper function to send a boolean result to a one-shot sender.
fn oneshot_send(sender: oneshot::Sender<bool>, result: bool) {
sender.send(result).expect("Writing to a one-shot sender should succeed.");
}
receiver
}

async fn prepare_blob_for_next_height(&self, blob_parameters: BlobParameters) {
Expand All @@ -156,15 +153,55 @@ impl CendeContext for CendeAmbassador {
}
}

async fn send_write_blob(
request_builder: RequestBuilder,
blob: &AerospikeBlob,
sender: oneshot::Sender<bool>,
) {
// TODO(dvir): consider set `prev_height_blob` to `None` after writing to AS.
match request_builder.json(blob).send().await {
Ok(response) => {
if response.status().is_success() {
debug!("Blob written to Aerospike successfully.");
oneshot_send(sender, true);
} else {
debug!(
"The recorder failed to write blob.\nStatus code: {}\nMessage: {}",
response.status(),
response.text().await.unwrap_or_default()
);
oneshot_send(sender, false);
}
}
Err(err) => {
// TODO(dvir): try to test this case.
debug!("Failed to send a request to the recorder. Error: {}", err);
oneshot_send(sender, false);
}
}
}

// Helper function to send a boolean result to a one-shot sender.
fn oneshot_send(sender: oneshot::Sender<bool>, result: bool) {
sender.send(result).expect("Cende one-shot send failed, receiver was dropped.");
}

#[derive(Clone, Debug, Default)]
pub struct BlobParameters {
// TODO(dvir): add here all the information needed for creating the blob: tranasctions, classes,
// block info, BlockExecutionArtifacts.
height: u64,
// TODO(dvir): add here all the information needed for creating the blob: tranasctions,
// classes, block info, BlockExecutionArtifacts.
}

impl BlobParameters {
pub fn new(height: u64) -> Self {
BlobParameters { height }
}
}

impl From<BlobParameters> for AerospikeBlob {
fn from(_blob_parameters: BlobParameters) -> Self {
fn from(blob_parameters: BlobParameters) -> Self {
// TODO(yael): make the full creation of blob.
AerospikeBlob {}
AerospikeBlob { block_number: BlockNumber(blob_parameters.height) }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ impl ConsensusContext for SequencerConsensusContext {
// TODO(dvir): pass here real `BlobParameters` info.
// TODO(dvir): when passing here the correct `BlobParameters`, also test that
// `prepare_blob_for_next_height` is called with the correct parameters.
self.cende_ambassador.prepare_blob_for_next_height(BlobParameters::default()).await;
self.cende_ambassador.prepare_blob_for_next_height(BlobParameters::new(height)).await;

let transaction_hashes =
transactions.iter().map(|tx| tx.tx_hash()).collect::<Vec<TransactionHash>>();
Expand Down
2 changes: 2 additions & 0 deletions crates/starknet_integration_tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ workspace = true

[dependencies]
assert_matches.workspace = true
axum.workspace = true
blockifier.workspace = true
cairo-lang-starknet-classes.workspace = true
futures.workspace = true
Expand All @@ -19,6 +20,7 @@ mempool_test_utils.workspace = true
papyrus_common.workspace = true
papyrus_config.workspace = true
papyrus_consensus.workspace = true
papyrus_consensus_orchestrator.workspace = true
papyrus_execution.workspace = true
papyrus_network = { workspace = true, features = ["testing"] }
papyrus_protobuf.workspace = true
Expand Down
6 changes: 5 additions & 1 deletion crates/starknet_integration_tests/src/flow_test_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::utils::{
create_consensus_manager_configs_and_channels,
create_mempool_p2p_configs,
create_node_config,
spawn_success_recorder,
};

const SEQUENCER_0: usize = 0;
Expand Down Expand Up @@ -120,12 +121,15 @@ impl FlowSequencerSetup {
accounts: Vec<AccountTransactionGenerator>,
sequencer_index: usize,
chain_info: ChainInfo,
consensus_manager_config: ConsensusManagerConfig,
mut consensus_manager_config: ConsensusManagerConfig,
mempool_p2p_config: MempoolP2pConfig,
mut available_ports: AvailablePorts,
) -> Self {
let storage_for_test = StorageTestSetup::new(accounts, &chain_info);

let recorder_url = spawn_success_recorder(available_ports.get_next_port());
consensus_manager_config.cende_config.recorder_url = recorder_url;

let component_config = ComponentConfig::default();

// Derive the configuration for the sequencer node.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::utils::{
create_consensus_manager_configs_and_channels,
create_mempool_p2p_configs,
create_node_config,
spawn_success_recorder,
};

pub struct IntegrationTestSetup {
Expand Down Expand Up @@ -147,14 +148,17 @@ impl IntegrationSequencerSetup {
accounts: Vec<AccountTransactionGenerator>,
sequencer_index: usize,
chain_info: ChainInfo,
consensus_manager_config: ConsensusManagerConfig,
mut consensus_manager_config: ConsensusManagerConfig,
mempool_p2p_config: MempoolP2pConfig,
available_ports: &mut AvailablePorts,
component_config: ComponentConfig,
) -> Self {
// Creating the storage for the test.
let storage_for_test = StorageTestSetup::new(accounts, &chain_info);

let recorder_url = spawn_success_recorder(available_ports.get_next_port());
consensus_manager_config.cende_config.recorder_url = recorder_url;

// Derive the configuration for the sequencer node.
let (config, required_params) = create_node_config(
available_ports,
Expand Down
Loading

0 comments on commit 49ca25e

Please sign in to comment.