Skip to content

Commit

Permalink
feat(sync): add class channel without usage
Browse files Browse the repository at this point in the history
  • Loading branch information
eitanm-starkware committed Sep 15, 2024
1 parent e796294 commit f14afef
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 36 deletions.
3 changes: 3 additions & 0 deletions crates/papyrus_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,10 +357,13 @@ fn register_to_network(network_config: Option<NetworkConfig>) -> anyhow::Result<
network_manager.register_sqmr_protocol_client(Protocol::StateDiff.into(), BUFFER_SIZE);
let transaction_client_sender =
network_manager.register_sqmr_protocol_client(Protocol::Transaction.into(), BUFFER_SIZE);
let class_client_sender =
network_manager.register_sqmr_protocol_client(Protocol::Class.into(), BUFFER_SIZE);
let p2p_sync_client_channels = P2PSyncClientChannels::new(
header_client_sender,
state_diff_client_sender,
transaction_client_sender,
class_client_sender,
);

let header_server_receiver = network_manager
Expand Down
31 changes: 18 additions & 13 deletions crates/papyrus_p2p_sync/src/client/header_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ async fn signed_headers_basic_flow() {
let TestArgs {
p2p_sync,
storage_reader,
mut header_receiver,
mut mock_header_response_manager,
// The test will fail if we drop these
state_diff_receiver: _state_diff_query_receiver,
transaction_receiver: _transaction_query_receiver,
mock_state_diff_response_manager: _mock_state_diff_response_manager,
mock_transaction_response_manager: _mock_transaction_response_manager,
mock_class_response_manager: _mock_class_response_manager,
..
} = setup();
let block_hashes_and_signatures =
Expand All @@ -44,7 +45,8 @@ async fn signed_headers_basic_flow() {
let end_block_number = (query_index + 1) * HEADER_QUERY_LENGTH;

// Receive query and validate it.
let mut mock_header_responses_manager = header_receiver.next().await.unwrap();
let mut mock_header_responses_manager =
mock_header_response_manager.next().await.unwrap();
assert_eq!(
*mock_header_responses_manager.query(),
Ok(HeaderQuery(Query {
Expand Down Expand Up @@ -110,17 +112,18 @@ async fn sync_sends_new_header_query_if_it_got_partial_responses() {

let TestArgs {
p2p_sync,
mut header_receiver,
mut mock_header_response_manager,
// The test will fail if we drop these
state_diff_receiver: _state_diff_query_receiver,
transaction_receiver: _transaction_query_receiver,
mock_state_diff_response_manager: _state_diff_receiver,
mock_transaction_response_manager: _transaction_receiver,
mock_class_response_manager: _class_receiver,
..
} = setup();
let block_hashes_and_signatures = create_block_hashes_and_signatures(NUM_ACTUAL_RESPONSES);

// Create a future that will receive a query, send partial responses and receive the next query.
let parse_queries_future = async move {
let mut mock_header_responses_manager = header_receiver.next().await.unwrap();
let mut mock_header_responses_manager = mock_header_response_manager.next().await.unwrap();

for (i, (block_hash, signature)) in block_hashes_and_signatures.into_iter().enumerate() {
mock_header_responses_manager
Expand All @@ -146,11 +149,13 @@ async fn sync_sends_new_header_query_if_it_got_partial_responses() {
tokio::time::resume();

// First unwrap is for the timeout. Second unwrap is for the Option returned from Stream.
let mock_header_responses_manager =
timeout(TIMEOUT_FOR_NEW_QUERY_AFTER_PARTIAL_RESPONSE, header_receiver.next())
.await
.unwrap()
.unwrap();
let mock_header_responses_manager = timeout(
TIMEOUT_FOR_NEW_QUERY_AFTER_PARTIAL_RESPONSE,
mock_header_response_manager.next(),
)
.await
.unwrap()
.unwrap();

assert_eq!(
*mock_header_responses_manager.query(),
Expand Down
9 changes: 8 additions & 1 deletion crates/papyrus_p2p_sync/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ use std::time::Duration;
use futures::channel::mpsc::SendError;
use futures::Stream;
use header::HeaderStreamBuilder;
use papyrus_common::pending_classes::ApiContractClass;
use papyrus_config::converters::deserialize_seconds_to_duration;
use papyrus_config::dumping::{ser_optional_param, ser_param, SerializeConfig};
use papyrus_config::{ParamPath, ParamPrivacyInput, SerializedParam};
use papyrus_network::network_manager::SqmrClientSender;
use papyrus_protobuf::converters::ProtobufConversionError;
use papyrus_protobuf::sync::{
ClassQuery,
DataOrFin,
HeaderQuery,
SignedBlockHeader,
Expand All @@ -31,6 +33,7 @@ use papyrus_protobuf::sync::{
use papyrus_storage::{StorageError, StorageReader, StorageWriter};
use serde::{Deserialize, Serialize};
use starknet_api::block::{BlockNumber, BlockSignature};
use starknet_api::core::ClassHash;
use starknet_api::transaction::FullTransaction;
use state_diff::StateDiffStreamBuilder;
use stream_builder::{DataStreamBuilder, DataStreamResult};
Expand Down Expand Up @@ -175,20 +178,24 @@ pub enum P2PSyncClientError {
type HeaderSqmrSender = SqmrClientSender<HeaderQuery, DataOrFin<SignedBlockHeader>>;
type StateSqmrDiffSender = SqmrClientSender<StateDiffQuery, DataOrFin<StateDiffChunk>>;
type TransactionSqmrSender = SqmrClientSender<TransactionQuery, DataOrFin<FullTransaction>>;
type ClassSqmrSender = SqmrClientSender<ClassQuery, DataOrFin<(ApiContractClass, ClassHash)>>;

pub struct P2PSyncClientChannels {
header_sender: HeaderSqmrSender,
state_diff_sender: StateSqmrDiffSender,
transaction_sender: TransactionSqmrSender,
#[allow(dead_code)]
class_sender: ClassSqmrSender,
}

impl P2PSyncClientChannels {
pub fn new(
header_sender: HeaderSqmrSender,
state_diff_sender: StateSqmrDiffSender,
transaction_sender: TransactionSqmrSender,
class_sender: ClassSqmrSender,
) -> Self {
Self { header_sender, state_diff_sender, transaction_sender }
Self { header_sender, state_diff_sender, transaction_sender, class_sender }
}
pub(crate) fn create_stream(
self,
Expand Down
26 changes: 15 additions & 11 deletions crates/papyrus_p2p_sync/src/client/state_diff_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,11 @@ async fn state_diff_basic_flow() {
let TestArgs {
p2p_sync,
storage_reader,
mut state_diff_receiver,
mut header_receiver,
mut mock_state_diff_response_manager,
mut mock_header_response_manager,
// The test will fail if we drop these
transaction_receiver: _mock_transaction_responses_manager,
mock_transaction_response_manager: _mock_transaction_responses_manager,
mock_class_response_manager: _mock_class_responses_manager,
..
} = setup();

Expand All @@ -69,8 +70,8 @@ async fn state_diff_basic_flow() {
tokio::time::sleep(SLEEP_DURATION_TO_LET_SYNC_ADVANCE).await;

// Check that before we send headers there is no state diff query.
assert!(state_diff_receiver.next().now_or_never().is_none());
let mut mock_header_responses_manager = header_receiver.next().await.unwrap();
assert!(mock_state_diff_response_manager.next().now_or_never().is_none());
let mut mock_header_responses_manager = mock_header_response_manager.next().await.unwrap();

// Send headers for entire query.
for (i, ((block_hash, block_signature), state_diff)) in
Expand Down Expand Up @@ -105,7 +106,8 @@ async fn state_diff_basic_flow() {
(STATE_DIFF_QUERY_LENGTH, HEADER_QUERY_LENGTH - STATE_DIFF_QUERY_LENGTH),
] {
// Get a state diff query and validate it
let mut mock_state_diff_responses_manager = state_diff_receiver.next().await.unwrap();
let mut mock_state_diff_responses_manager =
mock_state_diff_response_manager.next().await.unwrap();
assert_eq!(
*mock_state_diff_responses_manager.query(),
Ok(StateDiffQuery(Query {
Expand Down Expand Up @@ -318,10 +320,11 @@ async fn validate_state_diff_fails(
let TestArgs {
p2p_sync,
storage_reader,
mut state_diff_receiver,
mut header_receiver,
mut mock_state_diff_response_manager,
mut mock_header_response_manager,
// The test will fail if we drop these
transaction_receiver: _mock_transaction_responses_manager,
mock_transaction_response_manager: _mock_transaction_responses_manager,
mock_class_response_manager: _mock_class_responses_manager,
..
} = setup();

Expand All @@ -330,7 +333,7 @@ async fn validate_state_diff_fails(
// Create a future that will receive queries, send responses and validate the results.
let parse_queries_future = async move {
// Send a single header. There's no need to fill the entire query.
let mut mock_header_responses_manager = header_receiver.next().await.unwrap();
let mut mock_header_responses_manager = mock_header_response_manager.next().await.unwrap();
mock_header_responses_manager
.send_response(DataOrFin(Some(SignedBlockHeader {
block_header: BlockHeader {
Expand All @@ -354,7 +357,8 @@ async fn validate_state_diff_fails(
tokio::time::resume();

// Get a state diff query and validate it
let mut mock_state_diff_responses_manager = state_diff_receiver.next().await.unwrap();
let mut mock_state_diff_responses_manager =
mock_state_diff_response_manager.next().await.unwrap();
assert_eq!(
*mock_state_diff_responses_manager.query(),
Ok(StateDiffQuery(Query {
Expand Down
38 changes: 27 additions & 11 deletions crates/papyrus_p2p_sync/src/client/test_utils.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use std::time::Duration;

use lazy_static::lazy_static;
use papyrus_common::pending_classes::ApiContractClass;
use papyrus_network::network_manager::test_utils::{
mock_register_sqmr_protocol_client,
MockClientResponsesManager,
};
use papyrus_network::network_manager::GenericReceiver;
use papyrus_protobuf::sync::{
ClassQuery,
DataOrFin,
HeaderQuery,
SignedBlockHeader,
Expand All @@ -17,6 +19,7 @@ use papyrus_protobuf::sync::{
use papyrus_storage::test_utils::get_test_storage;
use papyrus_storage::StorageReader;
use starknet_api::block::{BlockHash, BlockSignature};
use starknet_api::core::ClassHash;
use starknet_api::crypto::utils::Signature;
use starknet_api::hash::StarkHash;
use starknet_api::transaction::FullTransaction;
Expand Down Expand Up @@ -47,28 +50,40 @@ type HeaderTestPayload = MockClientResponsesManager<HeaderQuery, DataOrFin<Signe
type StateDiffTestPayload = MockClientResponsesManager<StateDiffQuery, DataOrFin<StateDiffChunk>>;
type TransactionTestPayload =
MockClientResponsesManager<TransactionQuery, DataOrFin<FullTransaction>>;
type ClassTestPayload =
MockClientResponsesManager<ClassQuery, DataOrFin<(ApiContractClass, ClassHash)>>;

// TODO(Eitan): Use SqmrSubscriberChannels once there is a utility function for testing
pub struct TestArgs {
#[allow(clippy::type_complexity)]
pub p2p_sync: P2PSyncClient,
pub storage_reader: StorageReader,
pub header_receiver: GenericReceiver<HeaderTestPayload>,
pub state_diff_receiver: GenericReceiver<StateDiffTestPayload>,
pub mock_header_response_manager: GenericReceiver<HeaderTestPayload>,
pub mock_state_diff_response_manager: GenericReceiver<StateDiffTestPayload>,
#[allow(dead_code)]
pub transaction_receiver: GenericReceiver<TransactionTestPayload>,
pub mock_transaction_response_manager: GenericReceiver<TransactionTestPayload>,
#[allow(dead_code)]
pub mock_class_response_manager: GenericReceiver<ClassTestPayload>,
}

pub fn setup() -> TestArgs {
let p2p_sync_config = *TEST_CONFIG;
let buffer_size = p2p_sync_config.buffer_size;
let ((storage_reader, storage_writer), _temp_dir) = get_test_storage();
let (header_sender, header_receiver) = mock_register_sqmr_protocol_client(buffer_size);
let (state_diff_sender, state_diff_receiver) = mock_register_sqmr_protocol_client(buffer_size);
let (transaction_sender, transaction_receiver) =
let (header_sender, mock_header_response_manager) =
mock_register_sqmr_protocol_client(buffer_size);
let (state_diff_sender, mock_state_diff_response_manager) =
mock_register_sqmr_protocol_client(buffer_size);
let (transaction_sender, mock_transaction_response_manager) =
mock_register_sqmr_protocol_client(buffer_size);
let p2p_sync_channels =
P2PSyncClientChannels { header_sender, state_diff_sender, transaction_sender };
let (class_sender, mock_class_response_manager) =
mock_register_sqmr_protocol_client(buffer_size);
let p2p_sync_channels = P2PSyncClientChannels {
header_sender,
state_diff_sender,
transaction_sender,
class_sender,
};
let p2p_sync = P2PSyncClient::new(
p2p_sync_config,
storage_reader.clone(),
Expand All @@ -78,9 +93,10 @@ pub fn setup() -> TestArgs {
TestArgs {
p2p_sync,
storage_reader,
header_receiver,
state_diff_receiver,
transaction_receiver,
mock_header_response_manager,
mock_state_diff_response_manager,
mock_transaction_response_manager,
mock_class_response_manager,
}
}

Expand Down

0 comments on commit f14afef

Please sign in to comment.