Skip to content

Commit

Permalink
test(starknet_mempool_p2p): test gateway client error on adding tx fr…
Browse files Browse the repository at this point in the history
…om p2p
  • Loading branch information
AlonLStarkWare committed Dec 26, 2024
1 parent 13d63e5 commit 2c6dea4
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 35 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/starknet_mempool_p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ validator.workspace = true
[dev-dependencies]
futures.workspace = true
libp2p.workspace = true
mockall.workspace = true
papyrus_network = { workspace = true, features = ["testing"] }
papyrus_network_types = { workspace = true, features = ["testing"] }
papyrus_protobuf.workspace = true
Expand Down
124 changes: 89 additions & 35 deletions crates/starknet_mempool_p2p/src/runner/test.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use futures::channel::mpsc::Sender;
use futures::future::{pending, ready, BoxFuture};
use futures::stream::StreamExt;
use futures::{FutureExt, SinkExt};
Expand All @@ -17,10 +15,10 @@ use papyrus_protobuf::mempool::RpcTransactionWrapper;
use papyrus_test_utils::{get_rng, GetTestInstance};
use starknet_api::rpc_transaction::RpcTransaction;
use starknet_api::transaction::TransactionHash;
use starknet_gateway_types::communication::{GatewayClient, GatewayClientResult};
use starknet_gateway_types::communication::{GatewayClient, GatewayClientError, MockGatewayClient};
use starknet_gateway_types::errors::{GatewayError, GatewaySpecError};
use starknet_gateway_types::gateway_types::GatewayInput;
use starknet_sequencer_infra::component_definitions::ComponentStarter;
use tokio::time::sleep;

use super::MempoolP2pRunner;

Expand All @@ -44,8 +42,7 @@ fn setup(
#[test]
fn run_returns_when_network_future_returns() {
let network_future = ready(Ok(())).boxed();
let gateway_client =
Arc::new(MockGatewayClient { add_tx_sender: futures::channel::mpsc::channel(1).0 });
let gateway_client = Arc::new(MockGatewayClient::new());
let (mut mempool_p2p_runner, _) = setup(network_future, gateway_client);
mempool_p2p_runner.start().now_or_never().unwrap().unwrap();
}
Expand All @@ -54,53 +51,110 @@ fn run_returns_when_network_future_returns() {
fn run_returns_error_when_network_future_returns_error() {
let network_future =
ready(Err(NetworkError::DialError(libp2p::swarm::DialError::Aborted))).boxed();
let gateway_client =
Arc::new(MockGatewayClient { add_tx_sender: futures::channel::mpsc::channel(1).0 });
let gateway_client = Arc::new(MockGatewayClient::new());
let (mut mempool_p2p_runner, _) = setup(network_future, gateway_client);
mempool_p2p_runner.start().now_or_never().unwrap().unwrap_err();
}

// TODO(eitan): Make it an automock
#[derive(Clone)]
struct MockGatewayClient {
add_tx_sender: Sender<RpcTransaction>,
}
#[tokio::test]
async fn incoming_p2p_tx_reaches_gateway_client() {
let network_future = pending().boxed();

// Create channels for sending an empty message to indicate that the tx reached the gateway
// client.
let (add_tx_indicator_sender, add_tx_indicator_receiver) = futures::channel::oneshot::channel();

let message_metadata = BroadcastedMessageMetadata::get_test_instance(&mut get_rng());
let expected_rpc_transaction =
RpcTransactionWrapper(RpcTransaction::get_test_instance(&mut get_rng()));
let gateway_input = GatewayInput {
rpc_tx: expected_rpc_transaction.0.clone(),
message_metadata: Some(message_metadata.clone()),
};

let mut mock_gateway_client = MockGatewayClient::new();
mock_gateway_client.expect_add_tx().with(mockall::predicate::eq(gateway_input)).return_once(
move |_| {
add_tx_indicator_sender.send(()).unwrap();
Ok(TransactionHash::default())
},
);
let (mut mempool_p2p_runner, mock_network) =
setup(network_future, Arc::new(mock_gateway_client));

let BroadcastNetworkMock {
broadcasted_messages_sender: mut mock_broadcasted_messages_sender,
..
} = mock_network;

let res =
mock_broadcasted_messages_sender.send((expected_rpc_transaction.clone(), message_metadata));

#[async_trait]
impl GatewayClient for MockGatewayClient {
async fn add_tx(&self, gateway_input: GatewayInput) -> GatewayClientResult<TransactionHash> {
let _ = self.clone().add_tx_sender.send(gateway_input.rpc_tx).await;
Ok(TransactionHash::default())
res.await.expect("Failed to send message");

tokio::select! {
// if the runner fails, there was a network issue => panic.
// if the runner returns successfully, we panic because the runner should never terminate.
res = tokio::time::timeout(Duration::from_secs(5), mempool_p2p_runner.start()) => {
res.expect("Test timed out").expect("MempoolP2pRunner failed - network stopped unexpectedly");
panic!("MempoolP2pRunner terminated");
},
// if a message was received on this oneshot channel, the gateway client received the tx and the test succeeded.
res = add_tx_indicator_receiver => {res.unwrap()}
}
}

// The p2p runner receives a tx from network, and the gateway declines it, triggering report_peer.
#[tokio::test]
async fn start_component_receive_tx_happy_flow() {
async fn incoming_p2p_tx_fails_on_gateway_client() {
let network_future = pending().boxed();
let (add_tx_sender, mut add_tx_receiver) = futures::channel::mpsc::channel(1);
let mock_gateway_client = Arc::new(MockGatewayClient { add_tx_sender });
let (mut mempool_p2p_runner, mock_network) = setup(network_future, mock_gateway_client);
// Create channels for sending an empty message to indicate that the tx reached the gateway
// client.
let (add_tx_indicator_sender, add_tx_indicator_receiver) = futures::channel::oneshot::channel();

let message_metadata = BroadcastedMessageMetadata::get_test_instance(&mut get_rng());
let message_metadata_clone = message_metadata.clone();
let expected_rpc_transaction =
RpcTransactionWrapper(RpcTransaction::get_test_instance(&mut get_rng()));

let mut mock_gateway_client = MockGatewayClient::new();
mock_gateway_client.expect_add_tx().return_once(move |_| {
add_tx_indicator_sender.send(()).unwrap();
Err(GatewayClientError::GatewayError(GatewayError::GatewaySpecError {
source: GatewaySpecError::DuplicateTx,
p2p_message_metadata: Some(message_metadata_clone),
}))
});

let (mut mempool_p2p_runner, mock_network) =
setup(network_future, Arc::new(mock_gateway_client));

let BroadcastNetworkMock {
broadcasted_messages_sender: mut mock_broadcasted_messages_sender,
reported_messages_receiver: mut mock_reported_messages_receiver,
..
} = mock_network;
let message_metadata = BroadcastedMessageMetadata::get_test_instance(&mut get_rng());
let expected_rpc_transaction =
RpcTransactionWrapper(RpcTransaction::get_test_instance(&mut get_rng()));

// Sending the expected transaction to the mempool receiver
let res =
mock_broadcasted_messages_sender.send((expected_rpc_transaction.clone(), message_metadata));
let res = mock_broadcasted_messages_sender
.send((expected_rpc_transaction.clone(), message_metadata.clone()));

res.await.expect("Failed to send message");

tokio::select! {
_ = mempool_p2p_runner.start() => {panic!("Mempool receiver failed to start");}
actual_rpc_transaction = add_tx_receiver.next() => {
assert_eq!(actual_rpc_transaction, Some(expected_rpc_transaction.0));
}
_ = sleep(Duration::from_secs(5)) => {
panic!("Test timed out");
// if the runner fails, there was a network issue => panic.
// if the runner returns successfully, we panic because the runner should never terminate.
res = tokio::time::timeout(Duration::from_secs(5), mempool_p2p_runner.start()) => {
res.expect("Test timed out (MempoolP2pRunner took too long to start)").expect("MempoolP2pRunner failed - network stopped unexpectedly");
panic!("MempoolP2pRunner terminated");
},
// if a message was received on this oneshot channel, the gateway client received the tx.
res = add_tx_indicator_receiver => {
// if unwrap fails, the tx wasn't forwarded to the gateway client.
res.unwrap();
// After gateway client fails to add the tx, the p2p runner should have reported the peer.
let peer_reported = mock_reported_messages_receiver.next().await.expect("Failed to receive report");
// TODO: add this functionality to network manager test utils
assert_eq!(peer_reported, message_metadata.originator_id.private_get_peer_id())
}
}
}
// TODO(eitan): Add test for when the gateway client fails to add the transaction

0 comments on commit 2c6dea4

Please sign in to comment.