Skip to content

Commit

Permalink
refactor(sequencing): cende context, add logic and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
DvirYo-starkware committed Jan 1, 2025
1 parent 10cbfe1 commit 01ab55f
Show file tree
Hide file tree
Showing 7 changed files with 148 additions and 40 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ url = { workspace = true, features = ["serde"] }
infra_utils.workspace = true
lazy_static.workspace = true
mockall.workspace = true
mockito.workspace = true
papyrus_network = { workspace = true, features = ["testing"] }
papyrus_storage = { workspace = true, features = ["testing"] }
papyrus_test_utils.workspace = true
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use rstest::rstest;
use starknet_api::block::BlockNumber;

use super::{CendeAmbassador, RECORDER_WRITE_BLOB_PATH};
use crate::cende::{BlobParameters, CendeConfig, CendeContext};

const HEIGHT_TO_WRITE: u64 = 10;

#[rstest]
#[case::success(200, Some(9), 1, true)]
#[case::no_prev_block(200, None, 0, false)]
#[case::prev_block_height_mismatch(200, Some(7), 0, false)]
#[case::recorder_return_error(500, Some(9), 1, false)]
#[tokio::test]
async fn write_prev_height_blob(
#[case] status_code: usize,
#[case] prev_block: Option<u64>,
#[case] expected_calls: usize,
#[case] expected_result: bool,
) {
let mut server = mockito::Server::new_async().await;
let url = server.url();
let mock = server.mock("POST", RECORDER_WRITE_BLOB_PATH).with_status(status_code).create();

let cende_ambassador = CendeAmbassador::new(CendeConfig { recorder_url: url.parse().unwrap() });

if let Some(prev_block) = prev_block {
cende_ambassador.prepare_blob_for_next_height(BlobParameters { height: prev_block }).await;
}

let receiver = cende_ambassador.write_prev_height_blob(BlockNumber(HEIGHT_TO_WRITE));

assert_eq!(receiver.await.unwrap(), expected_result);
mock.expect(expected_calls).assert();
}

#[tokio::test]
async fn prepare_blob_for_next_height() {
let cende_ambassador =
CendeAmbassador::new(CendeConfig { recorder_url: "http://parsable_url".parse().unwrap() });

cende_ambassador.prepare_blob_for_next_height(BlobParameters { height: HEIGHT_TO_WRITE }).await;

assert_eq!(
cende_ambassador.prev_height_blob.lock().await.as_ref().unwrap().block_number.0,
HEIGHT_TO_WRITE
);
}
107 changes: 71 additions & 36 deletions crates/sequencing/papyrus_consensus_orchestrator/src/cende/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#[cfg(test)]
mod cende_test;
mod central_objects;

use std::collections::BTreeMap;
Expand All @@ -9,29 +11,28 @@ use futures::channel::oneshot;
use mockall::automock;
use papyrus_config::dumping::{ser_param, SerializeConfig};
use papyrus_config::{ParamPath, ParamPrivacyInput, SerializedParam};
use reqwest::Client;
use reqwest::{Client, RequestBuilder};
use serde::{Deserialize, Serialize};
use starknet_api::block::BlockNumber;
use tokio::sync::Mutex;
use tokio::task::{self};
use tracing::debug;
use url::Url;

// TODO(dvir): add tests when will have more logic.

/// A chunk of all the data to write to Aersopike.
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct AerospikeBlob {
// TODO(yael, dvir): add the blob fields.
block_number: BlockNumber,
}

#[cfg_attr(test, automock)]
#[async_trait]
pub trait CendeContext: Send + Sync {
/// Write the previous height blob to Aerospike. Returns a cell with an inner boolean indicating
/// whether the write was successful.
/// `height` is the height of the block that is built when calling this function.
fn write_prev_height_blob(&self, height: BlockNumber) -> oneshot::Receiver<bool>;
/// `current_height` is the height of the block that is built when calling this function.
fn write_prev_height_blob(&self, current_height: BlockNumber) -> oneshot::Receiver<bool>;

// Prepares the previous height blob that will be written in the next height.
async fn prepare_blob_for_next_height(&self, blob_parameters: BlobParameters);
Expand All @@ -48,7 +49,7 @@ pub struct CendeAmbassador {
}

/// The path to write blob in the Recorder.
const RECORDER_WRITE_BLOB_PATH: &str = "/write_blob";
pub const RECORDER_WRITE_BLOB_PATH: &str = "/cende/write_blob";

impl CendeAmbassador {
pub fn new(cende_config: CendeConfig) -> Self {
Expand Down Expand Up @@ -94,58 +95,85 @@ impl SerializeConfig for CendeConfig {

#[async_trait]
impl CendeContext for CendeAmbassador {
fn write_prev_height_blob(&self, height: BlockNumber) -> oneshot::Receiver<bool> {
fn write_prev_height_blob(&self, current_height: BlockNumber) -> oneshot::Receiver<bool> {
let (sender, receiver) = oneshot::channel();
let prev_height_blob = self.prev_height_blob.clone();
let request_builder = self.client.post(self.url.clone());
task::spawn(async move {
// TODO(dvir): remove this when handle the booting up case.
// Heights that are permitted to be built without writing to Aerospike.
// Height 1 to make `end_to_end_flow` test pass.
const SKIP_WRITE_HEIGHTS: [BlockNumber; 1] = [BlockNumber(1)];

if SKIP_WRITE_HEIGHTS.contains(&height) {
debug!(
"height {} is in `SKIP_WRITE_HEIGHTS`, consensus can send proposal without \
writing to Aerospike",
height
);
oneshot_send(sender, true);
return;
}
let Some(ref blob) = *prev_height_blob.lock().await else {
// TODO(dvir): remove this when handle the booting up case.
// Heights that are permitted to be built without writing to Aerospike.
// Height 1 to make `end_to_end_flow` test pass.
const SKIP_WRITE_HEIGHTS: [BlockNumber; 1] = [BlockNumber(1)];

if SKIP_WRITE_HEIGHTS.contains(&current_height) {
debug!(
"height {} is in `SKIP_WRITE_HEIGHTS`, consensus can send proposal without \
writing to Aerospike",
current_height
);
oneshot_send(sender, true);
return receiver;
}

task::spawn(async move {
let Some(ref blob): Option<AerospikeBlob> = *prev_height_blob.lock().await else {
// This case happens when restarting the node, `prev_height_blob` intial value is
// `None`.
debug!("No blob to write to Aerospike.");
oneshot_send(sender, false);
return;
};
// TODO(dvir): consider set `prev_height_blob` to `None` after writing to AS.

// Can happen in case the consensus got a block from the state sync and due to that did
// not update the cende ambassador in `decision_reached` function.
// TODO(dvir): what to do in the case of the `blob.block_number.0 >= height.0`? this
// means a bug.
if blob.block_number.0 + 1 != current_height.0 {
debug!(
"Mismatch blob block number and height, can't write blob to Aerospike. Blob \
block number {}, height {}",
blob.block_number, current_height
);
oneshot_send(sender, false);
return;
}

debug!("Writing blob to Aerospike.");
send_write_blob(request_builder, blob, sender).await;
});

return receiver;

async fn send_write_blob(
request_builder: RequestBuilder,
blob: &AerospikeBlob,
sender: oneshot::Sender<bool>,
) {
// TODO(dvir): consider set `prev_height_blob` to `None` after writing to AS.
match request_builder.json(blob).send().await {
Ok(response) => {
if response.status().is_success() {
debug!("Blob written to Aerospike successfully.");
oneshot_send(sender, true);
} else {
debug!("The recorder failed to write blob. Error: {}", response.status());
debug!(
"The recorder failed to write blob.\nStatus code: {}\nMessage: {}",
response.status(),
response.text().await.unwrap_or_default()
);
oneshot_send(sender, false);
}
}
Err(err) => {
// TODO(dvir): try to test this case.
debug!("Failed to send a request to the recorder. Error: {}", err);
// TODO(dvir): change this to `false`. The reason for the current value is to
// make the `end_to_end_flow_test` to pass.
oneshot_send(sender, true);
oneshot_send(sender, false);
}
}
});

return receiver;

}
// Helper function to send a boolean result to a one-shot sender.
fn oneshot_send(sender: oneshot::Sender<bool>, result: bool) {
sender.send(result).expect("Writing to a one-shot sender should succeed.");
sender.send(result).expect("Cende one-shot send failed, receiver was dropped.");
}
}

Expand All @@ -158,13 +186,20 @@ impl CendeContext for CendeAmbassador {

#[derive(Clone, Debug, Default)]
pub struct BlobParameters {
// TODO(dvir): add here all the information needed for creating the blob: tranasctions, classes,
// block info, BlockExecutionArtifacts.
height: u64,
// TODO(dvir): add here all the information needed for creating the blob: tranasctions,
// classes, block info, BlockExecutionArtifacts.
}

impl BlobParameters {
pub fn new(height: u64) -> Self {
BlobParameters { height }
}
}

impl From<BlobParameters> for AerospikeBlob {
fn from(_blob_parameters: BlobParameters) -> Self {
fn from(blob_parameters: BlobParameters) -> Self {
// TODO(yael): make the full creation of blob.
AerospikeBlob {}
AerospikeBlob { block_number: BlockNumber(blob_parameters.height) }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ impl ConsensusContext for SequencerConsensusContext {
// TODO(dvir): pass here real `BlobParameters` info.
// TODO(dvir): when passing here the correct `BlobParameters`, also test that
// `prepare_blob_for_next_height` is called with the correct parameters.
self.cende_ambassador.prepare_blob_for_next_height(BlobParameters::default()).await;
self.cende_ambassador.prepare_blob_for_next_height(BlobParameters::new(height)).await;

let transaction_hashes =
transactions.iter().map(|tx| tx.tx_hash()).collect::<Vec<TransactionHash>>();
Expand Down
2 changes: 2 additions & 0 deletions crates/starknet_integration_tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ workspace = true

[dependencies]
assert_matches.workspace = true
axum.workspace = true
blockifier.workspace = true
cairo-lang-starknet-classes.workspace = true
futures.workspace = true
Expand All @@ -19,6 +20,7 @@ mempool_test_utils.workspace = true
papyrus_common.workspace = true
papyrus_config.workspace = true
papyrus_consensus.workspace = true
papyrus_consensus_orchestrator.workspace = true
papyrus_execution.workspace = true
papyrus_network = { workspace = true, features = ["testing"] }
papyrus_protobuf.workspace = true
Expand Down
25 changes: 22 additions & 3 deletions crates/starknet_integration_tests/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ use std::future::Future;
use std::net::SocketAddr;
use std::time::Duration;

use axum::http::StatusCode;
use axum::routing::post;
use axum::Router;
use blockifier::context::ChainInfo;
use blockifier::test_utils::contracts::FeatureContract;
use blockifier::test_utils::{CairoVersion, RunnableCairo1};
use mempool_test_utils::starknet_api_test_utils::{AccountId, MultiAccountTransactionGenerator};
use papyrus_consensus::config::ConsensusConfig;
use papyrus_consensus::types::ValidatorId;
use papyrus_consensus_orchestrator::cende::{CendeConfig, RECORDER_WRITE_BLOB_PATH};
use papyrus_network::network_manager::test_utils::{
create_connected_network_configs,
create_network_configs_connected_to_broadcast_channels,
Expand Down Expand Up @@ -64,6 +68,7 @@ pub async fn create_node_config(
component_config: ComponentConfig,
) -> (SequencerNodeConfig, RequiredParams) {
let validator_id = set_validator_id(&mut consensus_manager_config, sequencer_index);
let recorder_url = consensus_manager_config.cende_config.recorder_url.clone();
let fee_token_addresses = chain_info.fee_token_addresses.clone();
let batcher_config = create_batcher_config(batcher_storage_config, chain_info.clone());
let gateway_config = create_gateway_config(chain_info.clone()).await;
Expand Down Expand Up @@ -93,8 +98,7 @@ pub async fn create_node_config(
eth_fee_token_address: fee_token_addresses.eth_fee_token_address,
strk_fee_token_address: fee_token_addresses.strk_fee_token_address,
validator_id,
// TODO(dvir): change this to real value when add recorder to integration tests.
recorder_url: Url::parse("https://recorder_url").expect("The URL is valid"),
recorder_url,
},
)
}
Expand All @@ -121,6 +125,8 @@ pub fn create_consensus_manager_configs_and_channels(
timeouts.prevote_timeout *= 3;
timeouts.proposal_timeout *= 3;

let recorder_url = spawn_success_recorder(available_ports.get_next_port());

let consensus_manager_configs = network_configs
.into_iter()
// TODO(Matan): Get config from default config file.
Expand All @@ -134,13 +140,26 @@ pub fn create_consensus_manager_configs_and_channels(
timeouts: timeouts.clone(),
..Default::default()
},
..Default::default()
cende_config: CendeConfig { recorder_url: recorder_url.clone(), },
})
.collect();

(consensus_manager_configs, broadcast_channels)
}

// Creates a local recorder server that always returns a success status.
pub fn spawn_success_recorder(port: u16) -> Url {
// [127, 0, 0, 1] is the localhost IP address.
let socket_addr = SocketAddr::from(([127, 0, 0, 1], port));
tokio::spawn(async move {
let router = Router::new()
.route(RECORDER_WRITE_BLOB_PATH, post(move || async { StatusCode::OK.to_string() }));
axum::Server::bind(&socket_addr).serve(router.into_make_service()).await.unwrap();
});

Url::parse(&format!("http://{}", socket_addr)).expect("Parsing recorder url fail")
}

pub fn test_rpc_state_reader_config(rpc_server_addr: SocketAddr) -> RpcStateReaderConfig {
// TODO(Tsabary): get the latest version from the RPC crate.
const RPC_SPEC_VERSION: &str = "V0_8";
Expand Down

0 comments on commit 01ab55f

Please sign in to comment.