diff --git a/Cargo.lock b/Cargo.lock index f3f611fb9a..d9a8193d67 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7408,6 +7408,7 @@ dependencies = [ "infra_utils", "lazy_static", "mockall", + "mockito 1.5.0", "papyrus_config", "papyrus_consensus", "papyrus_network", @@ -10518,6 +10519,7 @@ name = "starknet_integration_tests" version = "0.0.0" dependencies = [ "assert_matches", + "axum", "blockifier", "cairo-lang-starknet-classes", "futures", @@ -10528,6 +10530,7 @@ dependencies = [ "papyrus_common", "papyrus_config", "papyrus_consensus", + "papyrus_consensus_orchestrator", "papyrus_execution", "papyrus_network", "papyrus_protobuf", diff --git a/crates/sequencing/papyrus_consensus_orchestrator/Cargo.toml b/crates/sequencing/papyrus_consensus_orchestrator/Cargo.toml index a9e4a2ee2d..1815c4af01 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/Cargo.toml +++ b/crates/sequencing/papyrus_consensus_orchestrator/Cargo.toml @@ -33,6 +33,7 @@ blockifier.workspace = true infra_utils.workspace = true lazy_static.workspace = true mockall.workspace = true +mockito.workspace = true papyrus_network = { workspace = true, features = ["testing"] } papyrus_storage = { workspace = true, features = ["testing"] } papyrus_test_utils.workspace = true diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/cende/cende_test.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/cende/cende_test.rs new file mode 100644 index 0000000000..8560a38f2b --- /dev/null +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/cende/cende_test.rs @@ -0,0 +1,48 @@ +use rstest::rstest; +use starknet_api::block::BlockNumber; + +use super::{CendeAmbassador, RECORDER_WRITE_BLOB_PATH}; +use crate::cende::{BlobParameters, CendeConfig, CendeContext}; + +const HEIGHT_TO_WRITE: u64 = 10; + +#[rstest] +#[case::success(200, Some(9), 1, true)] +#[case::no_prev_block(200, None, 0, false)] +#[case::prev_block_height_mismatch(200, Some(7), 0, false)] +#[case::recorder_return_error(500, Some(9), 1, false)] +#[tokio::test] +async fn write_prev_height_blob( + #[case] mock_status_code: usize, + #[case] prev_block: Option, + #[case] expected_calls: usize, + #[case] expected_result: bool, +) { + let mut server = mockito::Server::new_async().await; + let url = server.url(); + let mock = server.mock("POST", RECORDER_WRITE_BLOB_PATH).with_status(mock_status_code).create(); + + let cende_ambassador = CendeAmbassador::new(CendeConfig { recorder_url: url.parse().unwrap() }); + + if let Some(prev_block) = prev_block { + cende_ambassador.prepare_blob_for_next_height(BlobParameters { height: prev_block }).await; + } + + let receiver = cende_ambassador.write_prev_height_blob(BlockNumber(HEIGHT_TO_WRITE)); + + assert_eq!(receiver.await.unwrap(), expected_result); + mock.expect(expected_calls).assert(); +} + +#[tokio::test] +async fn prepare_blob_for_next_height() { + let cende_ambassador = + CendeAmbassador::new(CendeConfig { recorder_url: "http://parsable_url".parse().unwrap() }); + + cende_ambassador.prepare_blob_for_next_height(BlobParameters { height: HEIGHT_TO_WRITE }).await; + + assert_eq!( + cende_ambassador.prev_height_blob.lock().await.as_ref().unwrap().block_number.0, + HEIGHT_TO_WRITE + ); +} diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/cende/mod.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/cende/mod.rs index f59e0dccac..879bf4f194 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/cende/mod.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/cende/mod.rs @@ -1,6 +1,9 @@ +#[cfg(test)] +mod cende_test; mod central_objects; use std::collections::BTreeMap; +use std::future::ready; use std::sync::Arc; use async_trait::async_trait; @@ -8,7 +11,7 @@ use async_trait::async_trait; use mockall::automock; use papyrus_config::dumping::{ser_param, SerializeConfig}; use papyrus_config::{ParamPath, ParamPrivacyInput, SerializedParam}; -use reqwest::Client; +use reqwest::{Client, RequestBuilder}; use serde::{Deserialize, Serialize}; use starknet_api::block::BlockNumber; use tokio::sync::Mutex; @@ -16,12 +19,11 @@ use tokio::task::{self, JoinHandle}; use tracing::debug; use url::Url; -// TODO(dvir): add tests when will have more logic. - /// A chunk of all the data to write to Aersopike. #[derive(Debug, Serialize, Deserialize)] pub(crate) struct AerospikeBlob { // TODO(yael, dvir): add the blob fields. + block_number: BlockNumber, } #[cfg_attr(test, automock)] @@ -29,8 +31,8 @@ pub(crate) struct AerospikeBlob { pub trait CendeContext: Send + Sync { /// Write the previous height blob to Aerospike. Returns a cell with an inner boolean indicating /// whether the write was successful. - /// `height` is the height of the block that is built when calling this function. - fn write_prev_height_blob(&self, height: BlockNumber) -> JoinHandle; + /// `current_height` is the height of the block that is built when calling this function. + fn write_prev_height_blob(&self, current_height: BlockNumber) -> JoinHandle; // Prepares the previous height blob that will be written in the next height. async fn prepare_blob_for_next_height(&self, blob_parameters: BlobParameters); @@ -47,7 +49,7 @@ pub struct CendeAmbassador { } /// The path to write blob in the Recorder. -const RECORDER_WRITE_BLOB_PATH: &str = "/write_blob"; +pub const RECORDER_WRITE_BLOB_PATH: &str = "/cende/write_blob"; impl CendeAmbassador { pub fn new(cende_config: CendeConfig) -> Self { @@ -93,48 +95,49 @@ impl SerializeConfig for CendeConfig { #[async_trait] impl CendeContext for CendeAmbassador { - fn write_prev_height_blob(&self, height: BlockNumber) -> JoinHandle { + fn write_prev_height_blob(&self, current_height: BlockNumber) -> JoinHandle { + // TODO(dvir): consider returning a future that will be spawned in the context instead. let prev_height_blob = self.prev_height_blob.clone(); let request_builder = self.client.post(self.url.clone()); - task::spawn(async move { - // TODO(dvir): remove this when handle the booting up case. - // Heights that are permitted to be built without writing to Aerospike. - // Height 1 to make `end_to_end_flow` test pass. - const SKIP_WRITE_HEIGHTS: [BlockNumber; 1] = [BlockNumber(1)]; - if SKIP_WRITE_HEIGHTS.contains(&height) { - debug!( - "height {} is in `SKIP_WRITE_HEIGHTS`, consensus can send proposal without \ - writing to Aerospike", - height - ); - return true; - } - let Some(ref blob) = *prev_height_blob.lock().await else { + // TODO(dvir): remove this when handle the booting up case. + // Heights that are permitted to be built without writing to Aerospike. + // Height 1 to make `end_to_end_flow` test pass. + const SKIP_WRITE_HEIGHTS: [BlockNumber; 1] = [BlockNumber(1)]; + + if SKIP_WRITE_HEIGHTS.contains(¤t_height) { + debug!( + "height {} is in `SKIP_WRITE_HEIGHTS`, consensus can send proposal without \ + writing to Aerospike", + current_height + ); + return tokio::spawn(ready(true)); + } + + task::spawn(async move { + // TODO(dvir): consider extracting the "should write blob" logic to a function. + let Some(ref blob): Option = *prev_height_blob.lock().await else { // This case happens when restarting the node, `prev_height_blob` intial value is // `None`. debug!("No blob to write to Aerospike."); return false; }; - // TODO(dvir): consider set `prev_height_blob` to `None` after writing to AS. - debug!("Writing blob to Aerospike."); - match request_builder.json(blob).send().await { - Ok(response) => { - if response.status().is_success() { - debug!("Blob written to Aerospike successfully."); - true - } else { - debug!("The recorder failed to write blob. Error: {}", response.status()); - false - } - } - Err(err) => { - debug!("Failed to send a request to the recorder. Error: {}", err); - // TODO(dvir): change this to `false`. The reason for the current value is to - // make the `end_to_end_flow_test` to pass. - true - } + + // Can happen in case the consensus got a block from the state sync and due to that did + // not update the cende ambassador in `decision_reached` function. + // TODO(dvir): what to do in the case of the `blob.block_number.0 >= height.0`? this + // means a bug. + if blob.block_number.0 + 1 != current_height.0 { + debug!( + "Mismatch blob block number and height, can't write blob to Aerospike. Blob \ + block number {}, height {}", + blob.block_number, current_height + ); + return false; } + + debug!("Writing blob to Aerospike."); + return send_write_blob(request_builder, blob).await; }) } @@ -145,15 +148,46 @@ impl CendeContext for CendeAmbassador { } } +async fn send_write_blob(request_builder: RequestBuilder, blob: &AerospikeBlob) -> bool { + // TODO(dvir): consider set `prev_height_blob` to `None` after writing to AS. + match request_builder.json(blob).send().await { + Ok(response) => { + if response.status().is_success() { + debug!("Blob written to Aerospike successfully."); + true + } else { + debug!( + "The recorder failed to write blob.\nStatus code: {}\nMessage: {}", + response.status(), + response.text().await.unwrap_or_default() + ); + false + } + } + Err(err) => { + // TODO(dvir): try to test this case. + debug!("Failed to send a request to the recorder. Error: {}", err); + false + } + } +} + #[derive(Clone, Debug, Default)] pub struct BlobParameters { - // TODO(dvir): add here all the information needed for creating the blob: tranasctions, classes, - // block info, BlockExecutionArtifacts. + height: u64, + // TODO(dvir): add here all the information needed for creating the blob: tranasctions, + // classes, block info, BlockExecutionArtifacts. +} + +impl BlobParameters { + pub fn new(height: u64) -> Self { + BlobParameters { height } + } } impl From for AerospikeBlob { - fn from(_blob_parameters: BlobParameters) -> Self { + fn from(blob_parameters: BlobParameters) -> Self { // TODO(yael): make the full creation of blob. - AerospikeBlob {} + AerospikeBlob { block_number: BlockNumber(blob_parameters.height) } } } diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs index a831bb3c37..e940b12f47 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs @@ -337,7 +337,7 @@ impl ConsensusContext for SequencerConsensusContext { // TODO(dvir): pass here real `BlobParameters` info. // TODO(dvir): when passing here the correct `BlobParameters`, also test that // `prepare_blob_for_next_height` is called with the correct parameters. - self.cende_ambassador.prepare_blob_for_next_height(BlobParameters::default()).await; + self.cende_ambassador.prepare_blob_for_next_height(BlobParameters::new(height)).await; let transaction_hashes = transactions.iter().map(|tx| tx.tx_hash()).collect::>(); diff --git a/crates/starknet_integration_tests/Cargo.toml b/crates/starknet_integration_tests/Cargo.toml index 546754cb30..014765eaa3 100644 --- a/crates/starknet_integration_tests/Cargo.toml +++ b/crates/starknet_integration_tests/Cargo.toml @@ -10,6 +10,7 @@ workspace = true [dependencies] assert_matches.workspace = true +axum.workspace = true blockifier.workspace = true cairo-lang-starknet-classes.workspace = true futures.workspace = true @@ -19,6 +20,7 @@ mempool_test_utils.workspace = true papyrus_common.workspace = true papyrus_config.workspace = true papyrus_consensus.workspace = true +papyrus_consensus_orchestrator.workspace = true papyrus_execution.workspace = true papyrus_network = { workspace = true, features = ["testing"] } papyrus_protobuf.workspace = true diff --git a/crates/starknet_integration_tests/src/flow_test_setup.rs b/crates/starknet_integration_tests/src/flow_test_setup.rs index 7480d85a22..dd222b80ca 100644 --- a/crates/starknet_integration_tests/src/flow_test_setup.rs +++ b/crates/starknet_integration_tests/src/flow_test_setup.rs @@ -30,6 +30,7 @@ use crate::utils::{ create_consensus_manager_configs_and_channels, create_mempool_p2p_configs, create_node_config, + spawn_success_recorder, }; const SEQUENCER_0: usize = 0; @@ -120,12 +121,15 @@ impl FlowSequencerSetup { accounts: Vec, sequencer_index: usize, chain_info: ChainInfo, - consensus_manager_config: ConsensusManagerConfig, + mut consensus_manager_config: ConsensusManagerConfig, mempool_p2p_config: MempoolP2pConfig, mut available_ports: AvailablePorts, ) -> Self { let storage_for_test = StorageTestSetup::new(accounts, &chain_info); + let recorder_url = spawn_success_recorder(available_ports.get_next_port()); + consensus_manager_config.cende_config.recorder_url = recorder_url; + let component_config = ComponentConfig::default(); // Derive the configuration for the sequencer node. diff --git a/crates/starknet_integration_tests/src/integration_test_setup.rs b/crates/starknet_integration_tests/src/integration_test_setup.rs index ce6d0f7adb..2161747ba5 100644 --- a/crates/starknet_integration_tests/src/integration_test_setup.rs +++ b/crates/starknet_integration_tests/src/integration_test_setup.rs @@ -19,7 +19,7 @@ use tracing::instrument; use crate::config_utils::dump_config_file_changes; use crate::state_reader::StorageTestSetup; -use crate::utils::create_node_config; +use crate::utils::{create_node_config, spawn_success_recorder}; pub struct SequencerSetup { /// Used to differentiate between different sequencer nodes. @@ -53,7 +53,7 @@ impl SequencerSetup { accounts: Vec, sequencer_index: usize, chain_info: ChainInfo, - consensus_manager_config: ConsensusManagerConfig, + mut consensus_manager_config: ConsensusManagerConfig, mempool_p2p_config: MempoolP2pConfig, available_ports: &mut AvailablePorts, component_config: ComponentConfig, @@ -61,6 +61,9 @@ impl SequencerSetup { // Creating the storage for the test. let storage_for_test = StorageTestSetup::new(accounts, &chain_info); + let recorder_url = spawn_success_recorder(available_ports.get_next_port()); + consensus_manager_config.cende_config.recorder_url = recorder_url; + // Derive the configuration for the sequencer node. let (config, required_params) = create_node_config( available_ports, diff --git a/crates/starknet_integration_tests/src/utils.rs b/crates/starknet_integration_tests/src/utils.rs index e2f22a8be8..70f809726a 100644 --- a/crates/starknet_integration_tests/src/utils.rs +++ b/crates/starknet_integration_tests/src/utils.rs @@ -1,12 +1,17 @@ use std::future::Future; +use std::net::SocketAddr; use std::time::Duration; +use axum::http::StatusCode; +use axum::routing::post; +use axum::Router; use blockifier::context::ChainInfo; use blockifier::test_utils::contracts::FeatureContract; use blockifier::test_utils::{CairoVersion, RunnableCairo1}; use mempool_test_utils::starknet_api_test_utils::{AccountId, MultiAccountTransactionGenerator}; use papyrus_consensus::config::ConsensusConfig; use papyrus_consensus::types::ValidatorId; +use papyrus_consensus_orchestrator::cende::RECORDER_WRITE_BLOB_PATH; use papyrus_network::network_manager::test_utils::{ create_connected_network_configs, create_network_configs_connected_to_broadcast_channels, @@ -64,6 +69,7 @@ pub async fn create_node_config( component_config: ComponentConfig, ) -> (SequencerNodeConfig, RequiredParams) { let validator_id = set_validator_id(&mut consensus_manager_config, sequencer_index); + let recorder_url = consensus_manager_config.cende_config.recorder_url.clone(); let fee_token_addresses = chain_info.fee_token_addresses.clone(); let batcher_config = create_batcher_config(batcher_storage_config, chain_info.clone()); let gateway_config = create_gateway_config(chain_info.clone()).await; @@ -91,8 +97,7 @@ pub async fn create_node_config( eth_fee_token_address: fee_token_addresses.eth_fee_token_address, strk_fee_token_address: fee_token_addresses.strk_fee_token_address, validator_id, - // TODO(dvir): change this to real value when add recorder to integration tests. - recorder_url: Url::parse("https://recorder_url").expect("Should be a valid URL"), + recorder_url, base_layer_config: EthereumBaseLayerConfigRequiredParams { node_url: Url::parse("https://node_url").expect("Should be a valid URL"), }, @@ -142,6 +147,19 @@ pub fn create_consensus_manager_configs_and_channels( (consensus_manager_configs, broadcast_channels) } +// Creates a local recorder server that always returns a success status. +pub fn spawn_success_recorder(port: u16) -> Url { + // [127, 0, 0, 1] is the localhost IP address. + let socket_addr = SocketAddr::from(([127, 0, 0, 1], port)); + tokio::spawn(async move { + let router = Router::new() + .route(RECORDER_WRITE_BLOB_PATH, post(move || async { StatusCode::OK.to_string() })); + axum::Server::bind(&socket_addr).serve(router.into_make_service()).await.unwrap(); + }); + + Url::parse(&format!("http://{}", socket_addr)).expect("Parsing recorder url fail") +} + pub fn create_mempool_p2p_configs(chain_id: ChainId, ports: Vec) -> Vec { create_connected_network_configs(ports) .into_iter()