Skip to content

Commit

Permalink
test: components communication using http with hyper
Browse files Browse the repository at this point in the history
  • Loading branch information
uriel-starkware committed Jun 23, 2024
1 parent ee9d00d commit f11d5b0
Show file tree
Hide file tree
Showing 4 changed files with 298 additions and 4 deletions.
13 changes: 12 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ axum = "0.6.12"
blockifier = { git = "https://github.com/starkware-libs/blockifier.git", branch = "main-mempool", features = [
"testing",
] }
bincode = "1.3.3"
cairo-lang-sierra = "2.6.0"
cairo-lang-starknet-classes = "2.6.0"
cairo-lang-utils = "2.6.0"
Expand All @@ -47,7 +48,7 @@ colored = "2.1.0"
const_format = "0.2.30"
derive_more = "0.99"
futures = "0.3.30"
hyper = { version = "0.14", features = ["client", "http1"] }
hyper = { version = "0.14", features = ["client", "http1", "http2"] }
indexmap = "2.1.0"
itertools = "0.13.0"
# TODO(YaelD, 28/5/2024): The special Papyrus version is needed in order to be aligned with the
Expand All @@ -57,6 +58,7 @@ papyrus_config = { git = "https://github.com/starkware-libs/papyrus.git", rev =
papyrus_rpc = { git = "https://github.com/starkware-libs/papyrus.git", rev = "5d37fc32" }
papyrus_storage = { git = "https://github.com/starkware-libs/papyrus.git", rev = "5d37fc32" }
pretty_assertions = "1.4.0"
prost = "0.12.6"
reqwest = { version = "0.11", features = ["blocking", "json"] }
rstest = "0.17.0"
serde = { version = "1.0.193", features = ["derive"] }
Expand All @@ -71,5 +73,6 @@ tempfile = "3.3.0"
thiserror = "1.0"
tokio = { version = "1.37.0", features = ["full"] }
tokio-test = "0.4.4"
tonic = "0.11.0"
url = "2.5.0"
validator = "0.12"
6 changes: 4 additions & 2 deletions crates/mempool_infra/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ workspace = true

[dependencies]
async-trait.workspace = true
bincode.workspace = true
serde.workspace = true
hyper.workspace = true
papyrus_config.workspace = true
prost.workspace = true
thiserror.workspace = true
tokio.workspace = true
tonic = "0.11.0"
prost = "0.12.6"
tonic.workspace = true

[dev-dependencies]
assert_matches.workspace = true
Expand Down
278 changes: 278 additions & 0 deletions crates/mempool_infra/tests/component_server_client_http_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
mod common;

use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;

use async_trait::async_trait;
use common::{ComponentATrait, ComponentBTrait};
use hyper::header::CONTENT_TYPE;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Client, Request, Response, Server, Uri};
use serde::{Deserialize, Serialize};
use starknet_mempool_infra::component_definitions::ComponentRequestHandler;
use tokio::sync::Mutex;
use tokio::task;

use crate::common::{ComponentA, ComponentB, ValueA, ValueB};

// Todo(uriel): Move to common
#[derive(Serialize, Deserialize, Debug)]
pub enum ComponentARequest {
AGetValue,
}

// Todo(uriel): Move to common
#[derive(Serialize, Deserialize, Debug)]
pub enum ComponentAResponse {
Value(ValueA),
}

// Todo(uriel): Make generic - ComponentClientHttp<Component>
struct ComponentAClientHttp {
uri: Uri,
}

impl ComponentAClientHttp {
pub fn new(ip_address: IpAddr, port: u16) -> Self {
let uri = match ip_address {
IpAddr::V4(ip_address) => format!("http://{}:{}/", ip_address, port).parse().unwrap(),
IpAddr::V6(ip_address) => format!("http://[{}]:{}/", ip_address, port).parse().unwrap(),
};
Self { uri }
}
}

// Todo(uriel): Change the component trait to client specific and make it return result
#[async_trait]
impl ComponentATrait for ComponentAClientHttp {
async fn a_get_value(&self) -> ValueA {
let component_request = ComponentARequest::AGetValue;
let http_request = Request::post(self.uri.clone())
.header("Content-Type", "application/octet-stream")
.body(Body::from(
bincode::serialize(&component_request)
.expect("Request serialization should succeed"),
))
.expect("Request builidng should succeed");

// Todo(uriel): Add configuration to control number of retries
let http_response =
Client::new().request(http_request).await.expect("Could not connect to server");
let body_bytes = hyper::body::to_bytes(http_response.into_body())
.await
.expect("Could not get response from server");
match bincode::deserialize(&body_bytes).expect("Response deserialization should succeed") {
ComponentAResponse::Value(value) => value,
}
}
}

#[async_trait]
impl ComponentRequestHandler<ComponentARequest, ComponentAResponse> for ComponentA {
async fn handle_request(&mut self, request: ComponentARequest) -> ComponentAResponse {
match request {
ComponentARequest::AGetValue => ComponentAResponse::Value(self.a_get_value().await),
}
}
}

struct ComponentAServerHttp {
socket: SocketAddr,
component: Arc<Mutex<ComponentA>>,
}

impl ComponentAServerHttp {
pub fn new(component: ComponentA, ip_address: IpAddr, port: u16) -> Self {
Self {
component: Arc::new(Mutex::new(component)),
socket: SocketAddr::new(ip_address, port),
}
}

pub async fn start(&mut self) {
let make_svc = make_service_fn(|_conn| {
let component = Arc::clone(&self.component);
async {
Ok::<_, hyper::Error>(service_fn(move |req| {
Self::handler(req, Arc::clone(&component))
}))
}
});

Server::bind(&self.socket.clone()).serve(make_svc).await.unwrap();
}

async fn handler(
http_request: Request<Body>,
component: Arc<Mutex<ComponentA>>,
) -> Result<Response<Body>, hyper::Error> {
let body_bytes = hyper::body::to_bytes(http_request.into_body()).await?;
let component_request: ComponentARequest =
bincode::deserialize(&body_bytes).expect("Request deserialization should succeed");

// Scoping is for releasing lock early (otherwise, component is locked until end of
// function)
let component_response;
{
let mut component_guard = component.lock().await;
component_response = component_guard.handle_request(component_request).await;
}
let http_response = Response::builder()
.header(CONTENT_TYPE, "application/octet-stream")
.body(Body::from(
bincode::serialize(&component_response)
.expect("Response serialization should succeed"),
))
.expect("Response builidng should succeed");

Ok(http_response)
}
}

// Todo(uriel): Move to common
#[derive(Serialize, Deserialize, Debug)]
pub enum ComponentBRequest {
BGetValue,
}

// Todo(uriel): Move to common
#[derive(Serialize, Deserialize, Debug)]
pub enum ComponentBResponse {
Value(ValueB),
}

// Todo(uriel): Make generic - ComponentClientHttp<Component>
struct ComponentBClientHttp {
uri: Uri,
}

impl ComponentBClientHttp {
pub fn new(ip_address: IpAddr, port: u16) -> Self {
let uri = match ip_address {
IpAddr::V4(ip_address) => format!("http://{}:{}/", ip_address, port).parse().unwrap(),
IpAddr::V6(ip_address) => format!("http://[{}]:{}/", ip_address, port).parse().unwrap(),
};
Self { uri }
}
}

// Todo(uriel): Change the component trait to client specific and make it return result
#[async_trait]
impl ComponentBTrait for ComponentBClientHttp {
async fn b_get_value(&self) -> ValueB {
let component_request = ComponentBRequest::BGetValue;
let http_request = Request::post(self.uri.clone())
.header("Content-Type", "application/octet-stream")
.body(Body::from(
bincode::serialize(&component_request)
.expect("Request serialization should succeed"),
))
.expect("Request builidng should succeed");

// Todo(uriel): Add configuration to control number of retries
let http_response =
Client::new().request(http_request).await.expect("Could not connect to server");
let body_bytes = hyper::body::to_bytes(http_response.into_body())
.await
.expect("Could not get response from server");
match bincode::deserialize(&body_bytes).expect("Response deserialization should succeed") {
ComponentBResponse::Value(value) => value,
}
}
}

#[async_trait]
impl ComponentRequestHandler<ComponentBRequest, ComponentBResponse> for ComponentB {
async fn handle_request(&mut self, request: ComponentBRequest) -> ComponentBResponse {
match request {
ComponentBRequest::BGetValue => ComponentBResponse::Value(self.b_get_value().await),
}
}
}

struct ComponentBServerHttp {
socket: SocketAddr,
component: Arc<Mutex<ComponentB>>,
}

impl ComponentBServerHttp {
pub fn new(component: ComponentB, ip_address: IpAddr, port: u16) -> Self {
Self {
component: Arc::new(Mutex::new(component)),
socket: SocketAddr::new(ip_address, port),
}
}

pub async fn start(&mut self) {
let make_svc = make_service_fn(|_conn| {
let component = Arc::clone(&self.component);
async {
Ok::<_, hyper::Error>(service_fn(move |req| {
Self::handler(req, Arc::clone(&component))
}))
}
});

Server::bind(&self.socket.clone()).serve(make_svc).await.unwrap();
}

async fn handler(
http_request: Request<Body>,
component: Arc<Mutex<ComponentB>>,
) -> Result<Response<Body>, hyper::Error> {
let body_bytes = hyper::body::to_bytes(http_request.into_body()).await?;
let component_request: ComponentBRequest =
bincode::deserialize(&body_bytes).expect("Request deserialization should succeed");

// Scoping is for releasing lock early (otherwise, component is locked until end of
// function)
let component_response;
{
let mut component_guard = component.lock().await;
component_response = component_guard.handle_request(component_request).await;
}
let http_response = Response::builder()
.header(CONTENT_TYPE, "application/octet-stream")
.body(Body::from(bincode::serialize(&component_response).unwrap()))
.expect("Response builidng should succeed");

Ok(http_response)
}
}

async fn verify_response(ip_address: IpAddr, port: u16, expected_value: ValueA) {
let a_client = ComponentAClientHttp::new(ip_address, port);
assert_eq!(a_client.a_get_value().await, expected_value);
}

#[tokio::test]
async fn test_setup() {
let setup_value: ValueB = 90;
let expected_value: ValueA = setup_value.into();

let local_ip = "::1".parse().unwrap();
let a_port = 10000;
let b_port = 10001;

let a_client = ComponentAClientHttp::new(local_ip, a_port);
let b_client = ComponentBClientHttp::new(local_ip, b_port);

let component_a = ComponentA::new(Box::new(b_client));
let component_b = ComponentB::new(setup_value, Box::new(a_client));

let mut component_a_server = ComponentAServerHttp::new(component_a, local_ip, a_port);
let mut component_b_server = ComponentBServerHttp::new(component_b, local_ip, b_port);

task::spawn(async move {
component_a_server.start().await;
});

task::spawn(async move {
component_b_server.start().await;
});

// Todo(uriel): Get rid of this
task::yield_now().await;

verify_response(local_ip, a_port, expected_value).await;
}

0 comments on commit f11d5b0

Please sign in to comment.