Skip to content

Commit

Permalink
feat(sequencing): add client to cende
Browse files Browse the repository at this point in the history
  • Loading branch information
DvirYo-starkware committed Dec 29, 2024
1 parent b79afdc commit 4bb9feb
Show file tree
Hide file tree
Showing 12 changed files with 127 additions and 18 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions config/sequencer/default_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,11 @@
"privacy": "Public",
"value": "0.0.0.0:8080"
},
"consensus_manager_config.cende_config.recorder_url": {
"description": "The URL of the Pythonic cende_recorder",
"pointer_target": "recorder_url",
"privacy": "Private"
},
"consensus_manager_config.consensus_config.chain_id": {
"description": "The chain id of the Starknet chain.",
"pointer_target": "chain_id",
Expand Down Expand Up @@ -944,6 +949,11 @@
"privacy": "Public",
"value": 8082
},
"recorder_url": {
"description": "A required param! The URL of the Pythonic cende_recorder",
"param_type": "String",
"privacy": "TemporaryValue"
},
"rpc_state_reader_config.json_rpc_version": {
"description": "The json rpc version.",
"privacy": "Public",
Expand Down
3 changes: 3 additions & 0 deletions crates/sequencing/papyrus_consensus_orchestrator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ blockifier.workspace = true
chrono.workspace = true
futures.workspace = true
indexmap.workspace = true
papyrus_config.workspace = true
papyrus_consensus.workspace = true
papyrus_network.workspace = true
papyrus_protobuf.workspace = true
papyrus_storage.workspace = true
reqwest.workspace = true
serde.workspace = true
starknet-types-core.workspace = true
starknet_api.workspace = true
Expand All @@ -25,6 +27,7 @@ starknet_state_sync_types = { workspace = true, features = ["testing"] }
tokio = { workspace = true, features = ["full"] }
tokio-util.workspace = true
tracing.workspace = true
url = { workspace = true, features = ["serde"] }

[dev-dependencies]
infra_utils.workspace = true
Expand Down
97 changes: 83 additions & 14 deletions crates/sequencing/papyrus_consensus_orchestrator/src/cende/mod.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
mod central_objects;

use std::collections::BTreeMap;
use std::sync::Arc;

use async_trait::async_trait;
use futures::channel::oneshot;
#[cfg(test)]
use mockall::automock;
use papyrus_config::dumping::{ser_param, SerializeConfig};
use papyrus_config::{ParamPath, ParamPrivacyInput, SerializedParam};
use reqwest::Client;
use serde::{Deserialize, Serialize};
use starknet_api::block::BlockNumber;
use tokio::sync::Mutex;
use tokio::task::{self};
use tracing::debug;
use url::Url;

// TODO(dvir): add tests when will have more logic.

/// A chunk of all the data to write to Aersopike.
#[derive(Debug)]
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct AerospikeBlob {
// TODO(yael, dvir): add the blob fields.
}
Expand All @@ -31,17 +37,58 @@ pub trait CendeContext: Send + Sync {
async fn prepare_blob_for_next_height(&self, blob_parameters: BlobParameters);
}

#[derive(Clone, Debug, Default)]
#[derive(Clone, Debug)]
pub struct CendeAmbassador {
// TODO(dvir): consider creating enum varaiant instead of the `Option<AerospikeBlob>`.
// `None` indicates that there is no blob to write, and therefore, the node can't be the
// proposer.
prev_height_blob: Arc<Mutex<Option<AerospikeBlob>>>,
url: Url,
client: Client,
}

/// The path to write blob in the Recorder.
const RECORDER_WRITE_BLOB_PATH: &str = "/write_blob";

impl CendeAmbassador {
pub fn new() -> Self {
CendeAmbassador { prev_height_blob: Arc::new(Mutex::new(None)) }
pub fn new(cende_config: CendeConfig) -> Self {
CendeAmbassador {
prev_height_blob: Arc::new(Mutex::new(None)),
url: cende_config
.recorder_url
.join(RECORDER_WRITE_BLOB_PATH)
.expect("Failed to join `RECORDER_WRITE_BLOB_PATH` with the Recorder URL"),
client: Client::new(),
}
}
}

#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct CendeConfig {
pub recorder_url: Url,
}

impl Default for CendeConfig {
fn default() -> Self {
CendeConfig {
// TODO(dvir): change this default value to "https://<recorder_url>". The reason for the
// current value is to make the `end_to_end_flow_test` to pass (it creates the default
// config).
recorder_url: "https://recorder_url"
.parse()
.expect("recorder_url must be a valid Recorder URL"),
}
}
}

impl SerializeConfig for CendeConfig {
fn dump(&self) -> BTreeMap<ParamPath, SerializedParam> {
BTreeMap::from_iter([ser_param(
"recorder_url",
&self.recorder_url,
"The URL of the Pythonic cende_recorder",
ParamPrivacyInput::Private,
)])
}
}

Expand All @@ -50,34 +97,56 @@ impl CendeContext for CendeAmbassador {
fn write_prev_height_blob(&self, height: BlockNumber) -> oneshot::Receiver<bool> {
let (sender, receiver) = oneshot::channel();
let prev_height_blob = self.prev_height_blob.clone();
let request_builder = self.client.post(self.url.clone());
task::spawn(async move {
// TODO(dvir): remove this when handle the booting up case.
// Heights that are permitted to be built without writing to Aerospike.
// Height 1 to make `end_to_end_flow` test pass.
const PERMITTED_HEIGHTS: [BlockNumber; 1] = [BlockNumber(1)];
const SKIP_WRITE_HEIGHTS: [BlockNumber; 1] = [BlockNumber(1)];

if PERMITTED_HEIGHTS.contains(&height) {
if SKIP_WRITE_HEIGHTS.contains(&height) {
debug!(
"height {} is in `PERMITTED_HEIGHTS`, consensus can send proposal without \
"height {} is in `SKIP_WRITE_HEIGHTS`, consensus can send proposal without \
writing to Aerospike",
height
);
sender.send(true).unwrap();
oneshot_send(sender, true);
return;
}
let Some(ref _blob) = *prev_height_blob.lock().await else {
let Some(ref blob) = *prev_height_blob.lock().await else {
// This case happen when restarting the node, `prev_height_blob` intial value in
// `None`.
debug!("No blob to write to Aerospike.");
sender.send(false).expect("Writing to a one-shot sender should succeed.");
oneshot_send(sender, false);
return;
};
// TODO(dvir): write blob to AS.
// TODO(dvir): consider set `prev_height_blob` to `None` after writing to AS.
debug!("Writing blob to Aerospike.");
sender.send(true).expect("Writing to a one-shot sender should succeed.");
debug!("Blob writing to Aerospike completed.");
match request_builder.json(blob).send().await {
Ok(response) => {
if response.status().is_success() {
debug!("Blob written to Aerospike successfully.");
oneshot_send(sender, true);
} else {
debug!("The recorder failed to write blob. Error: {}", response.status());
oneshot_send(sender, false);
}
}
Err(err) => {
debug!("Failed to send a request to the recorder. Error: {}", err);
// TODO(dvir): change this to `false`. The reason for the current value is to
// make the `end_to_end_flow_test` to pass.
oneshot_send(sender, true);
}
}
});

receiver
return receiver;

// Helper function to send a boolean result to a one-shot sender.
fn oneshot_send(sender: oneshot::Sender<bool>, result: bool) {
sender.send(result).expect("Writing to a one-shot sender should succeed.");
}
}

async fn prepare_blob_for_next_height(&self, blob_parameters: BlobParameters) {
Expand Down
8 changes: 6 additions & 2 deletions crates/starknet_consensus_manager/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::collections::BTreeMap;
use papyrus_config::dumping::{append_sub_config_name, SerializeConfig};
use papyrus_config::{ParamPath, SerializedParam};
use papyrus_consensus::config::ConsensusConfig;
use papyrus_consensus_orchestrator::cende::CendeConfig;
use serde::{Deserialize, Serialize};
use validator::Validate;

Expand All @@ -11,12 +12,15 @@ use validator::Validate;
#[derive(Clone, Default, Debug, Serialize, Deserialize, Validate, PartialEq)]
pub struct ConsensusManagerConfig {
pub consensus_config: ConsensusConfig,
pub cende_config: CendeConfig,
}

impl SerializeConfig for ConsensusManagerConfig {
fn dump(&self) -> BTreeMap<ParamPath, SerializedParam> {
let sub_configs =
vec![append_sub_config_name(self.consensus_config.dump(), "consensus_config")];
let sub_configs = vec![
append_sub_config_name(self.consensus_config.dump(), "consensus_config"),
append_sub_config_name(self.cende_config.dump(), "cende_config"),
];

sub_configs.into_iter().flatten().collect()
}
Expand Down
2 changes: 1 addition & 1 deletion crates/starknet_consensus_manager/src/consensus_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl ConsensusManager {
votes_broadcast_channels.broadcast_topic_client.clone(),
self.config.consensus_config.num_validators,
self.config.consensus_config.chain_id.clone(),
Arc::new(CendeAmbassador::new()),
Arc::new(CendeAmbassador::new(self.config.cende_config.clone())),
);

let mut network_handle = tokio::task::spawn(network_manager.run());
Expand Down
1 change: 1 addition & 0 deletions crates/starknet_integration_tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ strum.workspace = true
tempfile.workspace = true
tokio.workspace = true
tracing.workspace = true
url.workspace = true

[dev-dependencies]
futures.workspace = true
Expand Down
1 change: 1 addition & 0 deletions crates/starknet_integration_tests/src/config_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ pub(crate) fn dump_config_file_changes(
required_params.eth_fee_token_address,
required_params.strk_fee_token_address,
required_params.validator_id,
required_params.recorder_url,
);

// Create the entire mapping of the config and the pointers, without the required params.
Expand Down
4 changes: 4 additions & 0 deletions crates/starknet_integration_tests/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use starknet_sequencer_node::config::node_config::SequencerNodeConfig;
use starknet_sequencer_node::config::test_utils::RequiredParams;
use starknet_state_sync::config::StateSyncConfig;
use starknet_types_core::felt::Felt;
use url::Url;

pub fn create_chain_info() -> ChainInfo {
let mut chain_info = ChainInfo::create_for_testing();
Expand Down Expand Up @@ -92,6 +93,8 @@ pub async fn create_node_config(
eth_fee_token_address: fee_token_addresses.eth_fee_token_address,
strk_fee_token_address: fee_token_addresses.strk_fee_token_address,
validator_id,
// TODO(dvir): change this to real value when add recorder to integration tests.
recorder_url: Url::parse("https://recorder_url").expect("The URL is valid"),
},
)
}
Expand Down Expand Up @@ -129,6 +132,7 @@ pub fn create_consensus_manager_configs_and_channels(
timeouts: timeouts.clone(),
..Default::default()
},
..Default::default()
})
.collect();

Expand Down
3 changes: 2 additions & 1 deletion crates/starknet_sequencer_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ repository.workspace = true
license.workspace = true

[features]
testing = []
testing = ["url"]

[lints]
workspace = true
Expand Down Expand Up @@ -42,6 +42,7 @@ starknet_state_sync.workspace = true
starknet_state_sync_types.workspace = true
tokio.workspace = true
tracing.workspace = true
url = { workspace = true, optional = true }
validator.workspace = true

[dev-dependencies]
Expand Down
8 changes: 8 additions & 0 deletions crates/starknet_sequencer_node/src/config/node_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ pub static CONFIG_POINTERS: LazyLock<ConfigPointers> = LazyLock::new(|| {
),
set_pointing_param_paths(&["consensus_manager_config.consensus_config.validator_id"]),
),
(
ser_pointer_target_required_param(
"recorder_url",
SerializationType::String,
"The URL of the Pythonic cende_recorder",
),
set_pointing_param_paths(&["consensus_manager_config.cende_config.recorder_url"]),
),
];
let mut common_execution_config = generate_struct_pointer(
"versioned_constants_overrides".to_owned(),
Expand Down
3 changes: 3 additions & 0 deletions crates/starknet_sequencer_node/src/config/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::vec::Vec; // Used by #[gen_field_names_fn].
use papyrus_proc_macros::gen_field_names_and_cli_args_fn;
use papyrus_protobuf::consensus::DEFAULT_VALIDATOR_ID;
use starknet_api::core::{ChainId, ContractAddress};
use url::Url;

use crate::config::node_config::node_command;

Expand All @@ -13,6 +14,7 @@ pub struct RequiredParams {
pub eth_fee_token_address: ContractAddress,
pub strk_fee_token_address: ContractAddress,
pub validator_id: ContractAddress,
pub recorder_url: Url,
}

impl RequiredParams {
Expand All @@ -22,6 +24,7 @@ impl RequiredParams {
eth_fee_token_address: ContractAddress::from(2_u128),
strk_fee_token_address: ContractAddress::from(3_u128),
validator_id: ContractAddress::from(DEFAULT_VALIDATOR_ID),
recorder_url: Url::parse("https://recorder_url").expect("The URL is valid"),
}
}
}
Expand Down

0 comments on commit 4bb9feb

Please sign in to comment.