Skip to content

Commit

Permalink
feat: implement dynamic relay and rendezvous peer discovery, setup ka…
Browse files Browse the repository at this point in the history
…d with boot node
  • Loading branch information
fbozic committed May 29, 2024
1 parent 961e92f commit 8e2bef7
Show file tree
Hide file tree
Showing 11 changed files with 568 additions and 265 deletions.
1 change: 1 addition & 0 deletions examples/chat/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ libp2p = { version = "0.53.2", features = [
"dns",
"gossipsub",
"identify",
"kad",
"macros",
"mdns",
"noise",
Expand Down
3 changes: 1 addition & 2 deletions examples/chat/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,8 @@ async fn main() -> eyre::Result<()> {
let (network_client, mut network_events) = network::run(
keypair.clone(),
opt.port,
libp2p::rendezvous::Namespace::new(opt.rendezvous_namespace)?,
opt.boot_nodes.clone(),
opt.boot_nodes.clone(),
libp2p::rendezvous::Namespace::new(opt.rendezvous_namespace)?,
)
.await?;

Expand Down
198 changes: 59 additions & 139 deletions examples/chat/src/network.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,33 @@
use std::collections::hash_map::{self, HashMap};
use std::collections::BTreeMap;
use std::time::Duration;

use libp2p::futures::prelude::*;
use libp2p::swarm::{NetworkBehaviour, Swarm, SwarmEvent};
use libp2p::{
dcutr, gossipsub, identify, identity, mdns, noise, ping, relay, rendezvous, yamux, PeerId,
dcutr, gossipsub, identify, identity, kad, mdns, noise, ping, relay, rendezvous, yamux, PeerId,
};
use multiaddr::Multiaddr;
use tokio::sync::{mpsc, oneshot};
use tokio::time;
use tracing::{debug, error, info, trace, warn};
use tracing::{debug, info, trace, warn};

pub mod client;
pub mod discovery;
pub mod events;
pub mod types;

use client::NetworkClient;

const PROTOCOL_VERSION: &str = concat!("/", env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"));
const CALIMERO_KAD_PROTO_NAME: libp2p::StreamProtocol =
libp2p::StreamProtocol::new("/calimero/kad/1.0.0");

#[derive(NetworkBehaviour)]
struct Behaviour {
dcutr: dcutr::Behaviour,
identify: identify::Behaviour,
gossipsub: gossipsub::Behaviour,
identify: identify::Behaviour,
kad: kad::Behaviour<kad::store::MemoryStore>,
mdns: mdns::tokio::Behaviour,
ping: ping::Behaviour,
rendezvous: rendezvous::client::Behaviour,
Expand All @@ -34,34 +37,12 @@ struct Behaviour {
pub async fn run(
keypair: identity::Keypair,
port: u16,
boot_nodes: Vec<Multiaddr>,
rendezvous_namespace: rendezvous::Namespace,
relay_addresses: Vec<Multiaddr>,
rendezvous_addresses: Vec<Multiaddr>,
) -> eyre::Result<(NetworkClient, mpsc::Receiver<types::NetworkEvent>)> {
let mut rendezvous = BTreeMap::new();
for address in &rendezvous_addresses {
let entry = match peek_peer_id(&address) {
Ok(peer_id) => (peer_id, RendezvousEntry::new(address.clone())),
Err(err) => {
eyre::bail!("Failed to parse rendezvous PeerId: {}", err);
}
};
rendezvous.insert(entry.0, entry.1);
}

let mut relays = BTreeMap::new();
for address in &relay_addresses {
let entry = match peek_peer_id(&address) {
Ok(peer_id) => (peer_id, RelayEntry::new(address.clone())),
Err(err) => {
eyre::bail!("Failed to parse relay PeerId: {}", err);
}
};
relays.insert(entry.0, entry.1);
}

let (client, event_receiver, event_loop) =
init(keypair, relays, rendezvous_namespace, rendezvous).await?;
init(keypair, boot_nodes, rendezvous_namespace).await?;

tokio::spawn(event_loop.run());

let swarm_listen: Vec<Multiaddr> = vec![
Expand All @@ -73,47 +54,32 @@ pub async fn run(
client.listen_on(addr).await?;
}

tokio::spawn(run_init_dial(
client.clone(),
rendezvous_addresses,
relay_addresses,
));

Ok((client, event_receiver))
}

async fn run_init_dial(
client: NetworkClient,
rendezvous_addresses: Vec<Multiaddr>,
relay_addresses: Vec<Multiaddr>,
) {
tokio::time::sleep(Duration::from_secs(5)).await;

info!("Initiating dial to rendezvous and relay addresses.");

for addr in rendezvous_addresses
.into_iter()
.chain(relay_addresses)
.collect::<std::collections::HashSet<_>>()
.into_iter()
{
info!("Dialing address: {:?}", addr);
if let Err(err) = client.dial(addr).await {
error!("Failed to dial rendezvous address: {}", err);
};
}
}

async fn init(
keypair: identity::Keypair,
relays: BTreeMap<PeerId, RelayEntry>,
boot_nodes: Vec<Multiaddr>,
rendezvous_namespace: rendezvous::Namespace,
rendezvous: BTreeMap<PeerId, RendezvousEntry>,
) -> eyre::Result<(
NetworkClient,
mpsc::Receiver<types::NetworkEvent>,
EventLoop,
)> {
let bootstrap_peers = {
let mut peers = vec![];

for mut addr in boot_nodes {
let Some(multiaddr::Protocol::P2p(peer_id)) = addr.pop() else {
eyre::bail!("Failed to parse peer id from addr {:?}", addr);
};

peers.push((peer_id, addr));
}

peers
};

let peer_id = keypair.public().to_peer_id();
let swarm = libp2p::SwarmBuilder::with_existing_identity(keypair.clone())
.with_tokio()
Expand All @@ -130,6 +96,28 @@ async fn init(
identify::Config::new(PROTOCOL_VERSION.to_owned(), keypair.public())
.with_push_listen_addr_updates(true),
),
kad: {
let mut kademlia_config = kad::Config::default();
kademlia_config
.set_protocol_names(std::iter::once(CALIMERO_KAD_PROTO_NAME).collect());

let mut kademlia = kad::Behaviour::with_config(
peer_id,
kad::store::MemoryStore::new(peer_id),
kademlia_config,
);

kademlia.set_mode(Some(kad::Mode::Client));

for (peer_id, addr) in bootstrap_peers {
kademlia.add_address(&peer_id, addr);
}
if let Err(err) = kademlia.bootstrap() {
warn!(%err, "Failed to bootstrap Kademlia");
};

kademlia
},
gossipsub: gossipsub::Behaviour::new(
gossipsub::MessageAuthenticity::Signed(keypair.clone()),
gossipsub::Config::default(),
Expand All @@ -151,14 +139,7 @@ async fn init(
sender: command_sender,
};

let event_loop = EventLoop::new(
swarm,
command_receiver,
event_sender,
relays,
rendezvous_namespace,
rendezvous,
);
let event_loop = EventLoop::new(swarm, command_receiver, event_sender, rendezvous_namespace);

Ok((client, event_receiver, event_loop))
}
Expand All @@ -167,90 +148,30 @@ pub(crate) struct EventLoop {
swarm: Swarm<Behaviour>,
command_receiver: mpsc::Receiver<Command>,
event_sender: mpsc::Sender<types::NetworkEvent>,
relays: BTreeMap<PeerId, RelayEntry>,
network_state: discovery::model::DiscoveryState,
rendezvous_namespace: rendezvous::Namespace,
rendezvous: BTreeMap<PeerId, RendezvousEntry>,
pending_dial: HashMap<PeerId, oneshot::Sender<eyre::Result<Option<()>>>>,
}

#[derive(Debug)]
pub(crate) struct RelayEntry {
address: Multiaddr,
identify_state: IdentifyState,
reservation_state: RelayReservationState,
}

impl RelayEntry {
pub(crate) fn new(address: Multiaddr) -> Self {
Self {
address,
identify_state: Default::default(),
reservation_state: Default::default(),
}
}
}

#[derive(Debug)]
pub(crate) struct RendezvousEntry {
address: Multiaddr,
cookie: Option<rendezvous::Cookie>,
identify_state: IdentifyState,
}

impl RendezvousEntry {
pub(crate) fn new(address: Multiaddr) -> Self {
Self {
address,
cookie: Default::default(),
identify_state: Default::default(),
}
}
}

#[derive(Debug, Default, PartialEq)]
pub(crate) enum RelayReservationState {
#[default]
Unknown,
Requested,
Acquired,
}

#[derive(Debug, Default)]
pub(crate) struct IdentifyState {
sent: bool,
received: bool,
}

impl IdentifyState {
pub(crate) fn is_exchanged(&self) -> bool {
self.sent && self.received
}
}

impl EventLoop {
fn new(
swarm: Swarm<Behaviour>,
command_receiver: mpsc::Receiver<Command>,
event_sender: mpsc::Sender<types::NetworkEvent>,
relays: BTreeMap<PeerId, RelayEntry>,
rendezvous_namespace: rendezvous::Namespace,
rendezvous: BTreeMap<PeerId, RendezvousEntry>,
) -> Self {
Self {
swarm,
command_receiver,
event_sender,
relays,
rendezvous,
network_state: discovery::model::DiscoveryState::new(),
rendezvous_namespace,
pending_dial: Default::default(),
}
}

pub(crate) async fn run(mut self) {
let mut relays_dial_tick = tokio::time::interval(Duration::from_secs(90));
let mut rendezvous_discover_tick = tokio::time::interval(Duration::from_secs(30));
let mut rendezvous_dial_tick = tokio::time::interval(Duration::from_secs(90));
let mut rendezvous_discover_tick = tokio::time::interval(Duration::from_secs(90));

loop {
tokio::select! {
Expand All @@ -259,9 +180,8 @@ impl EventLoop {
let Some(c) = command else { break };
self.handle_command(c).await;
}
_ = relays_dial_tick.tick() => self.handle_relays_dial().await,
_ = rendezvous_dial_tick.tick() => self.handle_rendezvous_dial().await,
_ = rendezvous_discover_tick.tick() => self.handle_rendezvous_discover().await,
// _ = relay_dial_tick.tick() => self.handle_relay_reservations().await,
_ = rendezvous_discover_tick.tick() => self.handle_rendezvous_discoveries().await,
}
}
}
Expand Down Expand Up @@ -339,7 +259,7 @@ impl EventLoop {
.collect();
let count = peers.len();

let _ = sender.send(PeerInfo { count, peers });
let _ = sender.send(PeersInfo { count, peers });
}
Command::MeshPeerCount { topic, sender } => {
let peers: Vec<PeerId> = self
Expand All @@ -351,7 +271,7 @@ impl EventLoop {
.collect();
let count = peers.len();

let _ = sender.send(MeshPeerInfo { count, peers });
let _ = sender.send(MeshPeersInfo { count, peers });
}
}
}
Expand Down Expand Up @@ -381,24 +301,24 @@ pub(crate) enum Command {
sender: oneshot::Sender<eyre::Result<gossipsub::MessageId>>,
},
PeerInfo {
sender: oneshot::Sender<PeerInfo>,
sender: oneshot::Sender<PeersInfo>,
},
MeshPeerCount {
topic: gossipsub::TopicHash,
sender: oneshot::Sender<MeshPeerInfo>,
sender: oneshot::Sender<MeshPeersInfo>,
},
}

#[allow(dead_code)] // Info structs for pretty printing
#[derive(Debug)]
pub(crate) struct PeerInfo {
pub(crate) struct PeersInfo {
count: usize,
peers: Vec<PeerId>,
}

#[allow(dead_code)] // Info structs for pretty printing
#[derive(Debug)]
pub(crate) struct MeshPeerInfo {
pub(crate) struct MeshPeersInfo {
count: usize,
peers: Vec<PeerId>,
}
Expand Down
4 changes: 2 additions & 2 deletions examples/chat/src/network/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl NetworkClient {

receiver.await.expect("Sender not to be dropped.")
}
pub async fn peer_info(&self) -> super::PeerInfo {
pub async fn peer_info(&self) -> super::PeersInfo {
let (sender, receiver) = oneshot::channel();

self.sender
Expand All @@ -88,7 +88,7 @@ impl NetworkClient {
receiver.await.expect("Sender not to be dropped.")
}

pub async fn mesh_peer_info(&self, topic: gossipsub::TopicHash) -> super::MeshPeerInfo {
pub async fn mesh_peer_info(&self, topic: gossipsub::TopicHash) -> super::MeshPeersInfo {
let (sender, receiver) = oneshot::channel();

self.sender
Expand Down
Loading

0 comments on commit 8e2bef7

Please sign in to comment.