Skip to content

Commit

Permalink
feat(rendezvous): directly return error from register
Browse files Browse the repository at this point in the history
Resolves #4070.

Pull-Request: #4073.
  • Loading branch information
dgarus authored Jun 17, 2023
1 parent b8ceecc commit c4ab04c
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 69 deletions.
20 changes: 16 additions & 4 deletions examples/rendezvous/src/bin/rzv-identify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,14 @@ async fn main() {
SwarmEvent::Behaviour(MyBehaviourEvent::Identify(identify::Event::Received {
..
})) => {
swarm.behaviour_mut().rendezvous.register(
if let Err(error) = swarm.behaviour_mut().rendezvous.register(
rendezvous::Namespace::from_static("rendezvous"),
rendezvous_point,
None,
);
) {
log::error!("Failed to register: {error}");
return;
}
}
SwarmEvent::Behaviour(MyBehaviourEvent::Rendezvous(
rendezvous::client::Event::Registered {
Expand All @@ -99,9 +102,18 @@ async fn main() {
);
}
SwarmEvent::Behaviour(MyBehaviourEvent::Rendezvous(
rendezvous::client::Event::RegisterFailed(error),
rendezvous::client::Event::RegisterFailed {
rendezvous_node,
namespace,
error,
},
)) => {
log::error!("Failed to register {}", error);
log::error!(
"Failed to register: rendezvous_node={}, namespace={}, error_code={:?}",
rendezvous_node,
namespace,
error
);
return;
}
SwarmEvent::Behaviour(MyBehaviourEvent::Ping(ping::Event {
Expand Down
20 changes: 16 additions & 4 deletions examples/rendezvous/src/bin/rzv-register.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,14 @@ async fn main() {
log::error!("Lost connection to rendezvous point {}", error);
}
SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == rendezvous_point => {
swarm.behaviour_mut().rendezvous.register(
if let Err(error) = swarm.behaviour_mut().rendezvous.register(
rendezvous::Namespace::from_static("rendezvous"),
rendezvous_point,
None,
);
) {
log::error!("Failed to register: {error}");
return;
}
log::info!("Connection established with rendezvous point {}", peer_id);
}
// once `/identify` did its job, we know our external address and can register
Expand All @@ -97,9 +100,18 @@ async fn main() {
);
}
SwarmEvent::Behaviour(MyBehaviourEvent::Rendezvous(
rendezvous::client::Event::RegisterFailed(error),
rendezvous::client::Event::RegisterFailed {
rendezvous_node,
namespace,
error,
},
)) => {
log::error!("Failed to register {}", error);
log::error!(
"Failed to register: rendezvous_node={}, namespace={}, error_code={:?}",
rendezvous_node,
namespace,
error
);
return;
}
SwarmEvent::Behaviour(MyBehaviourEvent::Ping(ping::Event {
Expand Down
9 changes: 8 additions & 1 deletion protocols/rendezvous/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
## 0.13.0 - unreleased

- Changed the signature of the function `client::Behavior::register()`,
it returns `Result<(), RegisterError>` now.
Remove the `Remote` variant from `RegisterError` and instead put the information from `Remote`
directly into the variant from the `Event` enum.
See [PR 4073].

- Raise MSRV to 1.65.
See [PR 3715].

[PR 4073]: https://github.com/libp2p/rust-libp2p/pull/4073
[PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715

## 0.12.1
Expand All @@ -25,7 +32,7 @@

- Update to `libp2p-swarm` `v0.41.0`.

- Replace `Client` and `Server`'s `NetworkBehaviour` implemention `inject_*` methods with the new `on_*` methods.
- Replace `Client` and `Server`'s `NetworkBehaviour` implementation `inject_*` methods with the new `on_*` methods.
See [PR 3011].

- Update `rust-version` to reflect the actual MSRV: 1.62.0. See [PR 3090].
Expand Down
65 changes: 26 additions & 39 deletions protocols/rendezvous/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use libp2p_swarm::{
ConnectionDenied, ConnectionId, ExternalAddresses, FromSwarm, NetworkBehaviour, PollParameters,
THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
};
use std::collections::{HashMap, VecDeque};
use std::collections::HashMap;
use std::iter;
use std::task::{Context, Poll};
use std::time::Duration;
Expand All @@ -41,8 +41,6 @@ pub struct Behaviour {

keypair: Keypair,

error_events: VecDeque<Event>,

waiting_for_register: HashMap<RequestId, (PeerId, Namespace)>,
waiting_for_discovery: HashMap<RequestId, (PeerId, Option<Namespace>)>,

Expand All @@ -66,7 +64,6 @@ impl Behaviour {
iter::once((crate::PROTOCOL_IDENT, ProtocolSupport::Outbound)),
libp2p_request_response::Config::default(),
),
error_events: Default::default(),
keypair,
waiting_for_register: Default::default(),
waiting_for_discovery: Default::default(),
Expand All @@ -82,30 +79,26 @@ impl Behaviour {
///
/// External addresses are either manually added via [`libp2p_swarm::Swarm::add_external_address`] or reported
/// by other [`NetworkBehaviour`]s via [`ToSwarm::ExternalAddrConfirmed`].
pub fn register(&mut self, namespace: Namespace, rendezvous_node: PeerId, ttl: Option<Ttl>) {
pub fn register(
&mut self,
namespace: Namespace,
rendezvous_node: PeerId,
ttl: Option<Ttl>,
) -> Result<(), RegisterError> {
let external_addresses = self.external_addresses.iter().cloned().collect::<Vec<_>>();
if external_addresses.is_empty() {
self.error_events
.push_back(Event::RegisterFailed(RegisterError::NoExternalAddresses));

return;
return Err(RegisterError::NoExternalAddresses);
}

match PeerRecord::new(&self.keypair, external_addresses) {
Ok(peer_record) => {
let req_id = self.inner.send_request(
&rendezvous_node,
Register(NewRegistration::new(namespace.clone(), peer_record, ttl)),
);
self.waiting_for_register
.insert(req_id, (rendezvous_node, namespace));
}
Err(signing_error) => {
self.error_events.push_back(Event::RegisterFailed(
RegisterError::FailedToMakeRecord(signing_error),
));
}
};
let peer_record = PeerRecord::new(&self.keypair, external_addresses)?;
let req_id = self.inner.send_request(
&rendezvous_node,
Register(NewRegistration::new(namespace.clone(), peer_record, ttl)),
);
self.waiting_for_register
.insert(req_id, (rendezvous_node, namespace));

Ok(())
}

/// Unregister ourselves from the given namespace with the given rendezvous peer.
Expand Down Expand Up @@ -148,12 +141,6 @@ pub enum RegisterError {
NoExternalAddresses,
#[error("Failed to make a new PeerRecord")]
FailedToMakeRecord(#[from] SigningError),
#[error("Failed to register with Rendezvous node")]
Remote {
rendezvous_node: PeerId,
namespace: Namespace,
error: ErrorCode,
},
}

#[derive(Debug)]
Expand All @@ -178,7 +165,11 @@ pub enum Event {
namespace: Namespace,
},
/// We failed to register with the contained rendezvous node.
RegisterFailed(RegisterError),
RegisterFailed {
rendezvous_node: PeerId,
namespace: Namespace,
error: ErrorCode,
},
/// The connection details we learned from this node expired.
Expired { peer: PeerId },
}
Expand Down Expand Up @@ -239,10 +230,6 @@ impl NetworkBehaviour for Behaviour {
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
use libp2p_request_response as req_res;

if let Some(event) = self.error_events.pop_front() {
return Poll::Ready(ToSwarm::GenerateEvent(event));
}

loop {
match self.inner.poll(cx, params) {
Poll::Ready(ToSwarm::GenerateEvent(req_res::Event::Message {
Expand Down Expand Up @@ -337,11 +324,11 @@ impl NetworkBehaviour for Behaviour {
impl Behaviour {
fn event_for_outbound_failure(&mut self, req_id: &RequestId) -> Option<Event> {
if let Some((rendezvous_node, namespace)) = self.waiting_for_register.remove(req_id) {
return Some(Event::RegisterFailed(RegisterError::Remote {
return Some(Event::RegisterFailed {
rendezvous_node,
namespace,
error: ErrorCode::Unavailable,
}));
});
};

if let Some((rendezvous_node, namespace)) = self.waiting_for_discovery.remove(req_id) {
Expand Down Expand Up @@ -374,11 +361,11 @@ impl Behaviour {
if let Some((rendezvous_node, namespace)) =
self.waiting_for_register.remove(request_id)
{
return Some(Event::RegisterFailed(RegisterError::Remote {
return Some(Event::RegisterFailed {
rendezvous_node,
namespace,
error: error_code,
}));
});
}

None
Expand Down
65 changes: 44 additions & 21 deletions protocols/rendezvous/tests/rendezvous.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use futures::stream::FuturesUnordered;
use futures::StreamExt;
use libp2p_identity as identity;
use libp2p_rendezvous as rendezvous;
use libp2p_rendezvous::client::RegisterError;
use libp2p_swarm::{DialError, Swarm, SwarmEvent};
use libp2p_swarm_test::SwarmExt;
use std::convert::TryInto;
Expand All @@ -36,7 +37,8 @@ async fn given_successful_registration_then_successful_discovery() {

alice
.behaviour_mut()
.register(namespace.clone(), *robert.local_peer_id(), None);
.register(namespace.clone(), *robert.local_peer_id(), None)
.unwrap();

match libp2p_swarm_test::drive(&mut alice, &mut robert).await {
(
Expand Down Expand Up @@ -79,6 +81,21 @@ async fn given_successful_registration_then_successful_discovery() {
}
}

#[tokio::test]
async fn should_return_error_when_no_external_addresses() {
let _ = env_logger::try_init();
let namespace = rendezvous::Namespace::from_static("some-namespace");
let server = new_server(rendezvous::server::Config::default()).await;
let mut client = Swarm::new_ephemeral(rendezvous::client::Behaviour::new);

let actual = client
.behaviour_mut()
.register(namespace.clone(), *server.local_peer_id(), None)
.unwrap_err();

assert!(matches!(actual, RegisterError::NoExternalAddresses))
}

#[tokio::test]
async fn given_successful_registration_then_refresh_ttl() {
let _ = env_logger::try_init();
Expand All @@ -91,7 +108,8 @@ async fn given_successful_registration_then_refresh_ttl() {

alice
.behaviour_mut()
.register(namespace.clone(), roberts_peer_id, None);
.register(namespace.clone(), roberts_peer_id, None)
.unwrap();

match libp2p_swarm_test::drive(&mut alice, &mut robert).await {
(
Expand All @@ -114,7 +132,8 @@ async fn given_successful_registration_then_refresh_ttl() {

alice
.behaviour_mut()
.register(namespace.clone(), roberts_peer_id, Some(refresh_ttl));
.register(namespace.clone(), roberts_peer_id, Some(refresh_ttl))
.unwrap();

match libp2p_swarm_test::drive(&mut alice, &mut robert).await {
(
Expand Down Expand Up @@ -150,18 +169,18 @@ async fn given_invalid_ttl_then_unsuccessful_registration() {
let ([mut alice], mut robert) =
new_server_with_connected_clients(rendezvous::server::Config::default()).await;

alice.behaviour_mut().register(
namespace.clone(),
*robert.local_peer_id(),
Some(100_000_000),
);
alice
.behaviour_mut()
.register(
namespace.clone(),
*robert.local_peer_id(),
Some(100_000_000),
)
.unwrap();

match libp2p_swarm_test::drive(&mut alice, &mut robert).await {
(
[rendezvous::client::Event::RegisterFailed(rendezvous::client::RegisterError::Remote {
error,
..
})],
[rendezvous::client::Event::RegisterFailed { error, .. }],
[rendezvous::server::Event::PeerNotRegistered { .. }],
) => {
assert_eq!(error, rendezvous::ErrorCode::InvalidTtl);
Expand All @@ -182,7 +201,8 @@ async fn discover_allows_for_dial_by_peer_id() {

alice
.behaviour_mut()
.register(namespace.clone(), roberts_peer_id, None);
.register(namespace.clone(), roberts_peer_id, None)
.unwrap();
match alice.next_behaviour_event().await {
rendezvous::client::Event::Registered { .. } => {}
event => panic!("Unexpected event: {event:?}"),
Expand Down Expand Up @@ -233,14 +253,14 @@ async fn eve_cannot_register() {
eve.connect(&mut robert).await;

eve.behaviour_mut()
.register(namespace.clone(), *robert.local_peer_id(), None);
.register(namespace.clone(), *robert.local_peer_id(), None)
.unwrap();

match libp2p_swarm_test::drive(&mut eve, &mut robert).await {
(
[rendezvous::client::Event::RegisterFailed(rendezvous::client::RegisterError::Remote {
error: err_code,
..
})],
[rendezvous::client::Event::RegisterFailed {
error: err_code, ..
}],
[rendezvous::server::Event::PeerNotRegistered { .. }],
) => {
assert_eq!(err_code, rendezvous::ErrorCode::NotAuthorized);
Expand All @@ -263,7 +283,8 @@ async fn can_combine_client_and_server() {
charlie
.behaviour_mut()
.client
.register(namespace.clone(), *robert.local_peer_id(), None);
.register(namespace.clone(), *robert.local_peer_id(), None)
.unwrap();
match libp2p_swarm_test::drive(&mut charlie, &mut robert).await {
(
[CombinedEvent::Client(rendezvous::client::Event::Registered { .. })],
Expand All @@ -274,7 +295,8 @@ async fn can_combine_client_and_server() {

alice
.behaviour_mut()
.register(namespace, *charlie.local_peer_id(), None);
.register(namespace, *charlie.local_peer_id(), None)
.unwrap();
match libp2p_swarm_test::drive(&mut charlie, &mut alice).await {
(
[CombinedEvent::Server(rendezvous::server::Event::PeerRegistered { .. })],
Expand All @@ -299,7 +321,8 @@ async fn registration_on_clients_expire() {

alice
.behaviour_mut()
.register(namespace.clone(), roberts_peer_id, Some(registration_ttl));
.register(namespace.clone(), roberts_peer_id, Some(registration_ttl))
.unwrap();
match alice.next_behaviour_event().await {
rendezvous::client::Event::Registered { .. } => {}
event => panic!("Unexpected event: {event:?}"),
Expand Down

0 comments on commit c4ab04c

Please sign in to comment.