Skip to content

Commit

Permalink
refactor: review changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Larkooo committed Jan 24, 2024
1 parent 67c466c commit 9a9f3d3
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 31 deletions.
2 changes: 1 addition & 1 deletion crates/torii/libp2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
edition.workspace = true
license-file.workspace = true
license.workspace = true
name = "torii-libp2p"
name = "torii-relay"
repository.workspace = true
version.workspace = true

Expand Down
22 changes: 12 additions & 10 deletions crates/torii/libp2p/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub struct Behaviour {
ping: ping::Behaviour,
}

pub struct Libp2pClient {
pub struct RelayClient {
pub command_sender: Sender<Command>,
pub message_receiver: Receiver<Message>,
pub event_loop: EventLoop,
Expand All @@ -42,13 +42,13 @@ pub enum Command {
Publish(ClientMessage),
}

impl Libp2pClient {
impl RelayClient {
#[cfg(not(target_arch = "wasm32"))]
pub fn new(relay_addr: String) -> Result<Self, Error> {
let local_key = identity::Keypair::generate_ed25519();
let peer_id = PeerId::from(local_key.public());

info!("Local peer id: {:?}", peer_id);
info!(target: "torii::relay::client", peer_id = %peer_id, "Local peer id");

let mut swarm = libp2p::SwarmBuilder::with_existing_identity(local_key)
.with_tokio()
Expand Down Expand Up @@ -76,7 +76,7 @@ impl Libp2pClient {
.with_swarm_config(|cfg| cfg.with_idle_connection_timeout(Duration::from_secs(60)))
.build();

info!("Dialing relay: {:?}", relay_addr);
info!(target: "torii::relay::client", addr = %relay_addr, "Dialing relay");
swarm.dial(relay_addr.parse::<Multiaddr>()?)?;

let (message_sender, message_receiver) = futures::channel::mpsc::channel(0);
Expand All @@ -93,7 +93,7 @@ impl Libp2pClient {
let local_key = identity::Keypair::generate_ed25519();
let peer_id = PeerId::from(local_key.public());

info!("Local peer id: {:?}", peer_id);
info!(target: "torii::relay::client", peer_id = %peer_id, "Local peer id");

let mut swarm = libp2p::SwarmBuilder::with_existing_identity(local_key)
.with_wasm_bindgen()
Expand Down Expand Up @@ -123,7 +123,7 @@ impl Libp2pClient {
.with_swarm_config(|cfg| cfg.with_idle_connection_timeout(Duration::from_secs(60)))
.build();

info!("Dialing relay: {:?}", relay_addr);
info!(target: "torii::relay::client", addr = %relay_addr, "Dialing relay");
swarm.dial(relay_addr.parse::<Multiaddr>()?)?;

let (message_sender, message_receiver) = futures::channel::mpsc::channel(0);
Expand Down Expand Up @@ -171,14 +171,16 @@ impl EventLoop {
}
}
SwarmEvent::ConnectionClosed { cause: Some(cause), .. } => {
tracing::info!("Swarm event: {:?}", cause);
info!(target: "torii::relay::client", cause = ?cause, "Connection closed");

if let libp2p::swarm::ConnectionError::KeepAliveTimeout = cause {
tracing::info!("Keep alive timeout, shutting down");
// return;
info!(target: "torii::relay::client", "Connection closed due to keep alive timeout. Shutting down client.");
return;
}
}
evt => tracing::info!("Swarm event: {:?}", evt),
evt => {
info!(target: "torii::relay::client", event = ?evt, "Unhandled event");
}
}
},
}
Expand Down
36 changes: 23 additions & 13 deletions crates/torii/libp2p/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ pub struct Behaviour {
gossipsub: gossipsub::Behaviour,
}

pub struct Libp2pRelay {
pub struct Relay {
swarm: Swarm<Behaviour>,
}

impl Libp2pRelay {
impl Relay {
pub fn new(
port: u16,
port_webrtc: u16,
Expand Down Expand Up @@ -136,9 +136,12 @@ impl Libp2pRelay {
.expect("Failed to deserialize message");

info!(
"Received message {:?} from peer {:?} with topic {:?} and data \
{:?}",
message_id, peer_id, message.topic, message.data
target: "torii::relay::server",
message_id = %message_id,
peer_id = %peer_id,
topic = %message.topic,
data = %String::from_utf8_lossy(&message.data),
"Received message"
);

// forward message to room
Expand All @@ -160,19 +163,26 @@ impl Libp2pRelay {
peer_id,
}) => {
info!(
"Received identify event from peer {:?} with observed address {:?}",
peer_id, observed_addr
target: "torii::relay::server",
peer_id = %peer_id,
observed_addr = %observed_addr,
"Received identify event"
);
self.swarm.add_external_address(observed_addr.clone());
}
ServerEvent::Ping(ping::Event { peer, result, .. }) => {
info!("Ping success from peer {:?} with result {:?}", peer, result);
info!(
target: "torii::relay::server",
peer_id = %peer,
result = ?result,
"Received ping event"
);
}
_ => {}
}
}
SwarmEvent::NewListenAddr { address, .. } => {
info!("Listening on {:?}", address);
info!(target: "torii::relay::server", address = %address, "New listen address");
}
_ => {}
}
Expand All @@ -184,7 +194,7 @@ fn read_or_create_identity(path: &Path) -> anyhow::Result<identity::Keypair> {
if path.exists() {
let bytes = fs::read(path)?;

info!("Using existing identity from {}", path.display());
info!(target: "torii::relay::server", path = %path.display(), "Using existing identity");

return Ok(identity::Keypair::from_protobuf_encoding(&bytes)?); // This only works for ed25519 but that is what we are using.
}
Expand All @@ -193,7 +203,7 @@ fn read_or_create_identity(path: &Path) -> anyhow::Result<identity::Keypair> {

fs::write(path, identity.to_protobuf_encoding()?)?;

info!("Generated new identity and wrote it to {}", path.display());
info!(target: "torii::relay::server", path = %path.display(), "Generated new identity");

Ok(identity)
}
Expand All @@ -202,15 +212,15 @@ fn read_or_create_certificate(path: &Path) -> anyhow::Result<Certificate> {
if path.exists() {
let pem = fs::read_to_string(path)?;

info!("Using existing certificate from {}", path.display());
info!(target: "torii::relay::server", path = %path.display(), "Using existing certificate");

return Ok(Certificate::from_pem(&pem)?);
}

let cert = Certificate::generate(&mut rand::thread_rng())?;
fs::write(path, cert.serialize_pem().as_bytes())?;

info!("Generated new certificate and wrote it to {}", path.display());
info!(target: "torii::relay::server", path = %path.display(), "Generated new certificate");

Ok(cert)
}
14 changes: 7 additions & 7 deletions crates/torii/libp2p/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ mod test {

use futures::{SinkExt, StreamExt};

use crate::client::{Command, Libp2pClient};
use crate::client::{Command, RelayClient};
use crate::types::ClientMessage;

#[cfg(target_arch = "wasm32")]
Expand All @@ -21,17 +21,17 @@ mod test {
use tokio::time::sleep;
use tokio::{self, select};

use crate::server::Libp2pRelay;
use crate::server::Relay;

let _ = tracing_subscriber::fmt().with_env_filter("torii_libp2p=debug").try_init();
let _ = tracing_subscriber::fmt().with_env_filter("torii::relay::client=debug,torii::relay::server=debug").try_init();
// Initialize the relay server
let mut relay_server: Libp2pRelay = Libp2pRelay::new(9090, 9091, None, None)?;
let mut relay_server: Relay = Relay::new(9090, 9091, None, None)?;
tokio::spawn(async move {
relay_server.run().await;
});

// Initialize the first client (listener)
let mut client = Libp2pClient::new("/ip4/127.0.0.1/tcp/9090".to_string())?;
let mut client = RelayClient::new("/ip4/127.0.0.1/tcp/9090".to_string())?;
tokio::spawn(async move {
client.event_loop.run().await;
});
Expand Down Expand Up @@ -76,9 +76,9 @@ mod test {
let _ = tracing_subscriber::fmt().with_env_filter("torii_libp2p=debug").try_init();
// Initialize the first client (listener)
// Make sure the cert hash is correct - corresponding to the cert in the relay server
let mut client = Libp2pClient::new(
let mut client = RelayClient::new(
"/ip4/127.0.0.1/udp/9091/webrtc-direct/certhash/\
uEiBDKAUMioKpVK2CLQjtOL9eLPmaJkTcPPbBMtau7XaPGA"
uEiD6v3wzt8XU3s3SqgNSBJPvn9E0VMVFm8-G0iSEsIIDxw"
.to_string(),
)?;

Expand Down

0 comments on commit 9a9f3d3

Please sign in to comment.