Skip to content
This repository has been archived by the owner on Mar 23, 2021. It is now read-only.

Commit

Permalink
Add inbound connections to client pool
Browse files Browse the repository at this point in the history
  • Loading branch information
Lucas Soriano committed Jan 14, 2019
1 parent f482a95 commit 821ff5a
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 8 deletions.
43 changes: 42 additions & 1 deletion api_tests/dry/rfc003/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const bob = actor.create("bob");
const charlie = actor.create("charlie");

const alice_final_address = "0x00a329c0648769a73afac7f9381e08fb43dbea72";
const alice_comit_node_address = alice.config.comit.comit_listen;
const bob_comit_node_address = bob.config.comit.comit_listen;
const charlie_comit_node_address = charlie.config.comit.comit_listen;

Expand Down Expand Up @@ -101,6 +102,16 @@ describe("RFC003 HTTP API", () => {
});
});

it("[Bob] Should have no peers before receiving a swap request from Alice", async () => {
await chai
.request(alice.comit_node_url())
.get("/peers")
.then(res => {
res.should.have.status(200);
res.body.peers.should.have.length(0);
});
});

let alice_reasonable_swap_href;
it("[Alice] Should be able to make first swap request via HTTP api", async () => {
await chai
Expand Down Expand Up @@ -136,7 +147,7 @@ describe("RFC003 HTTP API", () => {
});
});

it("[Alice] Should see Bob's IP in her list of peers after sending a swap request to him", async () => {
it("[Alice] Should see Bob in her list of peers after sending a swap request to him", async () => {
await chai
.request(alice.comit_node_url())
.get("/peers")
Expand All @@ -146,6 +157,16 @@ describe("RFC003 HTTP API", () => {
});
});

it("[Bob] Should see a new peer in his list of peers after receiving a swap request from Alice", async () => {
await chai
.request(bob.comit_node_url())
.get("/peers")
.then(res => {
res.should.have.status(200);
res.body.peers.should.have.length(1);
});
});

let alice_stingy_swap_href;
it("[Alice] Should be able to make second swap request via HTTP api", async () => {
await chai
Expand Down Expand Up @@ -181,6 +202,26 @@ describe("RFC003 HTTP API", () => {
});
});

it("[Alice] Should still only see Bob in her list of peers after sending a second swap request to him", async () => {
await chai
.request(alice.comit_node_url())
.get("/peers")
.then(res => {
res.should.have.status(200);
res.body.peers.should.eql([bob_comit_node_address]);
});
});

it("[Bob] Should still only see one peer in his list of peers after receiving a second swap request from Alice", async () => {
await chai
.request(bob.comit_node_url())
.get("/peers")
.then(res => {
res.should.have.status(200);
res.body.peers.should.have.length(1);
});
});

it("[Alice] Is able to GET the swap after POSTing it", async () => {
await chai
.request(alice.comit_node_url())
Expand Down
19 changes: 14 additions & 5 deletions application/comit_node/src/bin/comit_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
extern crate log;

use comit_node::{
comit_client, comit_server,
comit_client::{self, bam::BamClientPool},
comit_server,
http_api::route_factory,
ledger_query_service::DefaultLedgerQueryServiceApiClient,
logging,
Expand Down Expand Up @@ -49,7 +50,12 @@ fn main() -> Result<(), failure::Error> {
&mut runtime,
);

spawn_comit_server(&settings, dependencies.clone(), &mut runtime);
spawn_comit_server(
&settings,
dependencies.clone(),
Arc::clone(&comit_client_factory),
&mut runtime,
);

// Block the current thread.
::std::thread::park();
Expand Down Expand Up @@ -121,12 +127,15 @@ fn spawn_warp_instance<S: AliceSpawner, C: comit_client::ClientPool>(
fn spawn_comit_server<B: BobSpawner>(
settings: &ComitNodeSettings,
bob_spawner: Arc<B>,
client_factory: Arc<BamClientPool>,
runtime: &mut tokio::runtime::Runtime,
) {
runtime.spawn(
comit_server::listen(settings.comit.comit_listen, bob_spawner).map_err(|e| {
error!("ComitServer shutdown: {:?}", e);
}),
comit_server::listen(settings.comit.comit_listen, bob_spawner, client_factory).map_err(
|e| {
error!("ComitServer shutdown: {:?}", e);
},
),
);
}

Expand Down
6 changes: 6 additions & 0 deletions application/comit_node/src/comit_client/bam.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,12 @@ impl ClientFactory<BamClient> for BamClientPool {
}
}
}
fn add_client(&self, comit_node_socket_addr: SocketAddr, client: Arc<BamClient>) {
debug!("Adding {:?} to list of peers", comit_node_socket_addr);
let mut clients = self.clients.write().unwrap();

clients.insert(comit_node_socket_addr, client);
}
}

impl ClientPool for BamClientPool {
Expand Down
1 change: 1 addition & 0 deletions application/comit_node/src/comit_client/fake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,5 @@ impl ClientFactory<FakeClient> for FakeClientFactory {
) -> Result<Arc<FakeClient>, ClientFactoryError> {
Ok(Arc::clone(&self.fake_client))
}
fn add_client(&self, _comit_node_socket_addr: SocketAddr, _bam_client: Arc<FakeClient>) {}
}
1 change: 1 addition & 0 deletions application/comit_node/src/comit_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub trait Client: Send + Sync + 'static {

pub trait ClientFactory<C>: Send + Sync + Debug {
fn client_for(&self, comit_node_socket_addr: SocketAddr) -> Result<Arc<C>, ClientFactoryError>;
fn add_client(&self, comit_node_socket_addr: SocketAddr, client: Arc<C>);
}

pub trait ClientPool: Send + Sync + Debug + 'static {
Expand Down
17 changes: 15 additions & 2 deletions application/comit_node/src/comit_server.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
use crate::{bam_api::rfc003::swap_config, swap_protocols::rfc003::bob::BobSpawner};
use crate::{
bam_api::rfc003::swap_config,
comit_client::{
bam::{BamClient, BamClientPool},
ClientFactory,
},
swap_protocols::rfc003::bob::BobSpawner,
};
use bam::{connection::Connection, json};
use futures::{Future, Stream};
use std::{io, net::SocketAddr, sync::Arc};
Expand All @@ -7,6 +14,7 @@ use tokio::{self, net::TcpListener};
pub fn listen<B: BobSpawner>(
addr: SocketAddr,
bob_spawner: Arc<B>,
bam_client_pool: Arc<BamClientPool>,
) -> impl Future<Item = (), Error = io::Error> {
info!("ComitServer listening at {:?}", addr);
let socket = TcpListener::bind(&addr).unwrap();
Expand All @@ -18,7 +26,12 @@ pub fn listen<B: BobSpawner>(
let config = swap_config(Arc::clone(&bob_spawner));

let connection = Connection::new(config, codec, connection);
let (close_future, _client) = connection.start::<json::JsonFrameHandler>();
let (close_future, client) = connection.start::<json::JsonFrameHandler>();

if let Ok(addr) = peer_addr {
let bam_client = Arc::new(BamClient::new(addr, client));
bam_client_pool.add_client(addr, bam_client);
}

tokio::spawn(close_future.then(move |result| {
match result {
Expand Down

0 comments on commit 821ff5a

Please sign in to comment.