From 821ff5ae7c0c33c8828a4353e5c60e56a404f7b7 Mon Sep 17 00:00:00 2001 From: Lucas Soriano Date: Wed, 9 Jan 2019 17:30:24 +1100 Subject: [PATCH] Add inbound connections to client pool --- api_tests/dry/rfc003/test.js | 43 ++++++++++++++++++- application/comit_node/src/bin/comit_node.rs | 19 +++++--- .../comit_node/src/comit_client/bam.rs | 6 +++ .../comit_node/src/comit_client/fake.rs | 1 + .../comit_node/src/comit_client/mod.rs | 1 + application/comit_node/src/comit_server.rs | 17 +++++++- 6 files changed, 79 insertions(+), 8 deletions(-) diff --git a/api_tests/dry/rfc003/test.js b/api_tests/dry/rfc003/test.js index 7b69718336..cfaea3b25f 100644 --- a/api_tests/dry/rfc003/test.js +++ b/api_tests/dry/rfc003/test.js @@ -24,6 +24,7 @@ const bob = actor.create("bob"); const charlie = actor.create("charlie"); const alice_final_address = "0x00a329c0648769a73afac7f9381e08fb43dbea72"; +const alice_comit_node_address = alice.config.comit.comit_listen; const bob_comit_node_address = bob.config.comit.comit_listen; const charlie_comit_node_address = charlie.config.comit.comit_listen; @@ -101,6 +102,16 @@ describe("RFC003 HTTP API", () => { }); }); + it("[Bob] Should have no peers before receiving a swap request from Alice", async () => { + await chai + .request(alice.comit_node_url()) + .get("/peers") + .then(res => { + res.should.have.status(200); + res.body.peers.should.have.length(0); + }); + }); + let alice_reasonable_swap_href; it("[Alice] Should be able to make first swap request via HTTP api", async () => { await chai @@ -136,7 +147,7 @@ describe("RFC003 HTTP API", () => { }); }); - it("[Alice] Should see Bob's IP in her list of peers after sending a swap request to him", async () => { + it("[Alice] Should see Bob in her list of peers after sending a swap request to him", async () => { await chai .request(alice.comit_node_url()) .get("/peers") @@ -146,6 +157,16 @@ describe("RFC003 HTTP API", () => { }); }); + it("[Bob] Should see a new peer in his list of peers after receiving a swap request from Alice", async () => { + await chai + .request(bob.comit_node_url()) + .get("/peers") + .then(res => { + res.should.have.status(200); + res.body.peers.should.have.length(1); + }); + }); + let alice_stingy_swap_href; it("[Alice] Should be able to make second swap request via HTTP api", async () => { await chai @@ -181,6 +202,26 @@ describe("RFC003 HTTP API", () => { }); }); + it("[Alice] Should still only see Bob in her list of peers after sending a second swap request to him", async () => { + await chai + .request(alice.comit_node_url()) + .get("/peers") + .then(res => { + res.should.have.status(200); + res.body.peers.should.eql([bob_comit_node_address]); + }); + }); + + it("[Bob] Should still only see one peer in his list of peers after receiving a second swap request from Alice", async () => { + await chai + .request(bob.comit_node_url()) + .get("/peers") + .then(res => { + res.should.have.status(200); + res.body.peers.should.have.length(1); + }); + }); + it("[Alice] Is able to GET the swap after POSTing it", async () => { await chai .request(alice.comit_node_url()) diff --git a/application/comit_node/src/bin/comit_node.rs b/application/comit_node/src/bin/comit_node.rs index f97b0f47a8..570390d975 100755 --- a/application/comit_node/src/bin/comit_node.rs +++ b/application/comit_node/src/bin/comit_node.rs @@ -5,7 +5,8 @@ extern crate log; use comit_node::{ - comit_client, comit_server, + comit_client::{self, bam::BamClientPool}, + comit_server, http_api::route_factory, ledger_query_service::DefaultLedgerQueryServiceApiClient, logging, @@ -49,7 +50,12 @@ fn main() -> Result<(), failure::Error> { &mut runtime, ); - spawn_comit_server(&settings, dependencies.clone(), &mut runtime); + spawn_comit_server( + &settings, + dependencies.clone(), + Arc::clone(&comit_client_factory), + &mut runtime, + ); // Block the current thread. ::std::thread::park(); @@ -121,12 +127,15 @@ fn spawn_warp_instance( fn spawn_comit_server( settings: &ComitNodeSettings, bob_spawner: Arc, + client_factory: Arc, runtime: &mut tokio::runtime::Runtime, ) { runtime.spawn( - comit_server::listen(settings.comit.comit_listen, bob_spawner).map_err(|e| { - error!("ComitServer shutdown: {:?}", e); - }), + comit_server::listen(settings.comit.comit_listen, bob_spawner, client_factory).map_err( + |e| { + error!("ComitServer shutdown: {:?}", e); + }, + ), ); } diff --git a/application/comit_node/src/comit_client/bam.rs b/application/comit_node/src/comit_client/bam.rs index 3f8476a380..762eaf5972 100644 --- a/application/comit_node/src/comit_client/bam.rs +++ b/application/comit_node/src/comit_client/bam.rs @@ -203,6 +203,12 @@ impl ClientFactory for BamClientPool { } } } + fn add_client(&self, comit_node_socket_addr: SocketAddr, client: Arc) { + debug!("Adding {:?} to list of peers", comit_node_socket_addr); + let mut clients = self.clients.write().unwrap(); + + clients.insert(comit_node_socket_addr, client); + } } impl ClientPool for BamClientPool { diff --git a/application/comit_node/src/comit_client/fake.rs b/application/comit_node/src/comit_client/fake.rs index 7db5b9af0a..9f3476d64f 100644 --- a/application/comit_node/src/comit_client/fake.rs +++ b/application/comit_node/src/comit_client/fake.rs @@ -92,4 +92,5 @@ impl ClientFactory for FakeClientFactory { ) -> Result, ClientFactoryError> { Ok(Arc::clone(&self.fake_client)) } + fn add_client(&self, _comit_node_socket_addr: SocketAddr, _bam_client: Arc) {} } diff --git a/application/comit_node/src/comit_client/mod.rs b/application/comit_node/src/comit_client/mod.rs index e10e3532e7..d43e842bc1 100644 --- a/application/comit_node/src/comit_client/mod.rs +++ b/application/comit_node/src/comit_client/mod.rs @@ -25,6 +25,7 @@ pub trait Client: Send + Sync + 'static { pub trait ClientFactory: Send + Sync + Debug { fn client_for(&self, comit_node_socket_addr: SocketAddr) -> Result, ClientFactoryError>; + fn add_client(&self, comit_node_socket_addr: SocketAddr, client: Arc); } pub trait ClientPool: Send + Sync + Debug + 'static { diff --git a/application/comit_node/src/comit_server.rs b/application/comit_node/src/comit_server.rs index c2ff62f5cf..06d71d2ca2 100644 --- a/application/comit_node/src/comit_server.rs +++ b/application/comit_node/src/comit_server.rs @@ -1,4 +1,11 @@ -use crate::{bam_api::rfc003::swap_config, swap_protocols::rfc003::bob::BobSpawner}; +use crate::{ + bam_api::rfc003::swap_config, + comit_client::{ + bam::{BamClient, BamClientPool}, + ClientFactory, + }, + swap_protocols::rfc003::bob::BobSpawner, +}; use bam::{connection::Connection, json}; use futures::{Future, Stream}; use std::{io, net::SocketAddr, sync::Arc}; @@ -7,6 +14,7 @@ use tokio::{self, net::TcpListener}; pub fn listen( addr: SocketAddr, bob_spawner: Arc, + bam_client_pool: Arc, ) -> impl Future { info!("ComitServer listening at {:?}", addr); let socket = TcpListener::bind(&addr).unwrap(); @@ -18,7 +26,12 @@ pub fn listen( let config = swap_config(Arc::clone(&bob_spawner)); let connection = Connection::new(config, codec, connection); - let (close_future, _client) = connection.start::(); + let (close_future, client) = connection.start::(); + + if let Ok(addr) = peer_addr { + let bam_client = Arc::new(BamClient::new(addr, client)); + bam_client_pool.add_client(addr, bam_client); + } tokio::spawn(close_future.then(move |result| { match result {