diff --git a/crates/starknet_batcher_types/src/communication.rs b/crates/starknet_batcher_types/src/communication.rs index 59f7a19982..838d37df1c 100644 --- a/crates/starknet_batcher_types/src/communication.rs +++ b/crates/starknet_batcher_types/src/communication.rs @@ -100,7 +100,7 @@ pub enum BatcherClientError { impl BatcherClient for LocalBatcherClient { async fn build_proposal(&self, input: BuildProposalInput) -> BatcherClientResult<()> { let request = BatcherRequest::BuildProposal(input); - let response = self.send(request).await; + let response = self.send(request).await?; handle_response_variants!(BatcherResponse, BuildProposal, BatcherClientError, BatcherError) } @@ -109,7 +109,7 @@ impl BatcherClient for LocalBatcherClient { input: GetProposalContentInput, ) -> BatcherClientResult { let request = BatcherRequest::GetProposalContent(input); - let response = self.send(request).await; + let response = self.send(request).await?; handle_response_variants!( BatcherResponse, GetProposalContent, @@ -120,7 +120,7 @@ impl BatcherClient for LocalBatcherClient { async fn validate_proposal(&self, input: ValidateProposalInput) -> BatcherClientResult<()> { let request = BatcherRequest::ValidateProposal(input); - let response = self.send(request).await; + let response = self.send(request).await?; handle_response_variants!( BatcherResponse, ValidateProposal, @@ -134,7 +134,7 @@ impl BatcherClient for LocalBatcherClient { input: SendProposalContentInput, ) -> BatcherClientResult { let request = BatcherRequest::SendProposalContent(input); - let response = self.send(request).await; + let response = self.send(request).await?; handle_response_variants!( BatcherResponse, SendProposalContent, @@ -145,13 +145,13 @@ impl BatcherClient for LocalBatcherClient { async fn start_height(&self, input: StartHeightInput) -> BatcherClientResult<()> { let request = BatcherRequest::StartHeight(input); - let response = self.send(request).await; + let response = self.send(request).await?; handle_response_variants!(BatcherResponse, StartHeight, BatcherClientError, BatcherError) } async fn decision_reached(&self, input: DecisionReachedInput) -> BatcherClientResult<()> { let request = BatcherRequest::DecisionReached(input); - let response = self.send(request).await; + let response = self.send(request).await?; handle_response_variants!( BatcherResponse, DecisionReached, diff --git a/crates/starknet_gateway_types/src/communication.rs b/crates/starknet_gateway_types/src/communication.rs index 8fb42c2f1f..72cf01fc36 100644 --- a/crates/starknet_gateway_types/src/communication.rs +++ b/crates/starknet_gateway_types/src/communication.rs @@ -56,13 +56,8 @@ impl GatewayClient for LocalGatewayClient { #[instrument(skip(self))] async fn add_tx(&self, gateway_input: GatewayInput) -> GatewayClientResult { let request = GatewayRequest::AddTransaction(gateway_input); - let response = self.send(request).await; - match response { - GatewayResponse::AddTransaction(Ok(response)) => Ok(response), - GatewayResponse::AddTransaction(Err(response)) => { - Err(GatewayClientError::GatewayError(response)) - } - } + let response = self.send(request).await?; + handle_response_variants!(GatewayResponse, AddTransaction, GatewayClientError, GatewayError) } } diff --git a/crates/starknet_mempool_p2p_types/src/communication.rs b/crates/starknet_mempool_p2p_types/src/communication.rs index 962bfe9bad..b5c58dd4ef 100644 --- a/crates/starknet_mempool_p2p_types/src/communication.rs +++ b/crates/starknet_mempool_p2p_types/src/communication.rs @@ -69,7 +69,7 @@ impl MempoolP2pPropagatorClient for LocalMempoolP2pPropagatorClient { transaction: RpcTransaction, ) -> MempoolP2pPropagatorClientResult<()> { let request = MempoolP2pPropagatorRequest::AddTransaction(transaction); - let response = self.send(request).await; + let response = self.send(request).await?; handle_response_variants!( MempoolP2pPropagatorResponse, AddTransaction, @@ -83,7 +83,7 @@ impl MempoolP2pPropagatorClient for LocalMempoolP2pPropagatorClient { propagation_metadata: BroadcastedMessageMetadata, ) -> MempoolP2pPropagatorClientResult<()> { let request = MempoolP2pPropagatorRequest::ContinuePropagation(propagation_metadata); - let response = self.send(request).await; + let response = self.send(request).await?; handle_response_variants!( MempoolP2pPropagatorResponse, ContinuePropagation, @@ -114,10 +114,7 @@ impl MempoolP2pPropagatorClient for RemoteMempoolP2pPropagatorClient { propagation_metadata: BroadcastedMessageMetadata, ) -> MempoolP2pPropagatorClientResult<()> { let request = MempoolP2pPropagatorRequest::ContinuePropagation(propagation_metadata); - let response = match self.send(request).await { - Ok(resp) => resp, - Err(client_error) => return Err(MempoolP2pPropagatorClientError::from(client_error)), - }; + let response = self.send(request).await?; handle_response_variants!( MempoolP2pPropagatorResponse, ContinuePropagation, diff --git a/crates/starknet_mempool_types/src/communication.rs b/crates/starknet_mempool_types/src/communication.rs index 0c1a27111d..700933a1d7 100644 --- a/crates/starknet_mempool_types/src/communication.rs +++ b/crates/starknet_mempool_types/src/communication.rs @@ -70,19 +70,19 @@ pub enum MempoolClientError { impl MempoolClient for LocalMempoolClient { async fn add_tx(&self, args: AddTransactionArgsWrapper) -> MempoolClientResult<()> { let request = MempoolRequest::AddTransaction(args); - let response = self.send(request).await; + let response = self.send(request).await?; handle_response_variants!(MempoolResponse, AddTransaction, MempoolClientError, MempoolError) } async fn commit_block(&self, args: CommitBlockArgs) -> MempoolClientResult<()> { let request = MempoolRequest::CommitBlock(args); - let response = self.send(request).await; + let response = self.send(request).await?; handle_response_variants!(MempoolResponse, CommitBlock, MempoolClientError, MempoolError) } async fn get_txs(&self, n_txs: usize) -> MempoolClientResult> { let request = MempoolRequest::GetTransactions(n_txs); - let response = self.send(request).await; + let response = self.send(request).await?; handle_response_variants!( MempoolResponse, GetTransactions, diff --git a/crates/starknet_sequencer_infra/src/component_client/local_component_client.rs b/crates/starknet_sequencer_infra/src/component_client/local_component_client.rs index 477929e808..0a86518196 100644 --- a/crates/starknet_sequencer_infra/src/component_client/local_component_client.rs +++ b/crates/starknet_sequencer_infra/src/component_client/local_component_client.rs @@ -3,6 +3,7 @@ use std::any::type_name; use tokio::sync::mpsc::{channel, Sender}; use tracing::info; +use crate::component_client::ClientResult; use crate::component_definitions::ComponentRequestAndResponseSender; /// The `LocalComponentClient` struct is a generic client for sending component requests and @@ -71,11 +72,11 @@ where Self { tx } } - pub async fn send(&self, request: Request) -> Response { + pub async fn send(&self, request: Request) -> ClientResult { let (res_tx, mut res_rx) = channel::(1); let request_and_res_tx = ComponentRequestAndResponseSender { request, tx: res_tx }; self.tx.send(request_and_res_tx).await.expect("Outbound connection should be open."); - res_rx.recv().await.expect("Inbound connection should be open.") + Ok(res_rx.recv().await.expect("Inbound connection should be open.")) } } diff --git a/crates/starknet_sequencer_infra/src/component_server/remote_component_server.rs b/crates/starknet_sequencer_infra/src/component_server/remote_component_server.rs index 06deb9af1f..65e9080ffe 100644 --- a/crates/starknet_sequencer_infra/src/component_server/remote_component_server.rs +++ b/crates/starknet_sequencer_infra/src/component_server/remote_component_server.rs @@ -131,14 +131,22 @@ where { Ok(request) => { let response = local_client.send(request).await; - HyperResponse::builder() - .status(StatusCode::OK) - .header(CONTENT_TYPE, APPLICATION_OCTET_STREAM) - .body(Body::from( - BincodeSerdeWrapper::new(response) - .to_bincode() - .expect("Response serialization should succeed"), - )) + match response { + Ok(response) => HyperResponse::builder() + .status(StatusCode::OK) + .header(CONTENT_TYPE, APPLICATION_OCTET_STREAM) + .body(Body::from( + BincodeSerdeWrapper::new(response) + .to_bincode() + .expect("Response serialization should succeed"), + )), + Err(error) => { + panic!( + "Remote server failed sending with its local client. Error: {:?}", + error + ); + } + } } Err(error) => { let server_error = ServerError::RequestDeserializationFailure(error.to_string()); diff --git a/crates/starknet_sequencer_infra/src/tests/local_component_client_server_test.rs b/crates/starknet_sequencer_infra/src/tests/local_component_client_server_test.rs index b23b74a925..1eba7b07c3 100644 --- a/crates/starknet_sequencer_infra/src/tests/local_component_client_server_test.rs +++ b/crates/starknet_sequencer_infra/src/tests/local_component_client_server_test.rs @@ -29,7 +29,7 @@ type ComponentBClient = LocalComponentClient { async fn a_get_value(&self) -> ResultA { let res = self.send(ComponentARequest::AGetValue).await; - match res { + match res? { ComponentAResponse::AGetValue(value) => Ok(value), } } @@ -39,7 +39,7 @@ impl ComponentAClientTrait for LocalComponentClient { async fn b_get_value(&self) -> ResultB { let res = self.send(ComponentBRequest::BGetValue).await; - match res { + match res? { ComponentBResponse::BGetValue(value) => Ok(value), unexpected_response => { Err(ClientError::UnexpectedResponse(format!("{unexpected_response:?}"))) @@ -48,7 +48,8 @@ impl ComponentBClientTrait for LocalComponentClient ClientResult<()> { - match self.send(ComponentBRequest::BSetValue(value)).await { + let res = self.send(ComponentBRequest::BSetValue(value)).await; + match res? { ComponentBResponse::BSetValue => Ok(()), unexpected_response => { Err(ClientError::UnexpectedResponse(format!("{unexpected_response:?}")))