Skip to content

Commit

Permalink
Add close RPC connections
Browse files Browse the repository at this point in the history
Added the ability to close RPC connections for a given peer:
- When dialing a peer (in establishing a new connection), a drop old connections
  switch can be included in the dial request from the client so that only only one
  RPC client connection will ever be active.
  • Loading branch information
hansieodendaal committed Oct 28, 2024
1 parent 20e70fa commit 3730921
Show file tree
Hide file tree
Showing 36 changed files with 563 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl CommandContext {
let start = Instant::now();
println!("☎️ Dialing peer...");

match connectivity.dial_peer(dest_node_id).await {
match connectivity.dial_peer(dest_node_id, false).await {
Ok(connection) => {
println!("⚡️ Peer connected in {}ms!", start.elapsed().as_millis());
println!("Connection: {}", connection);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ impl<'a, B: BlockchainBackend + 'static> BlockSynchronizer<'a, B> {
}

async fn connect_to_sync_peer(&self, peer: NodeId) -> Result<PeerConnection, BlockSyncError> {
let connection = self.connectivity.dial_peer(peer).await?;
let connection = self.connectivity.dial_peer(peer, false).await?;
Ok(connection)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
async fn dial_sync_peer(&self, node_id: &NodeId) -> Result<PeerConnection, BlockHeaderSyncError> {
let timer = Instant::now();
debug!(target: LOG_TARGET, "Dialing {} sync peer", node_id);
let conn = self.connectivity.dial_peer(node_id.clone()).await?;
let conn = self.connectivity.dial_peer(node_id.clone(), false).await?;
info!(
target: LOG_TARGET,
"Successfully dialed sync peer {} in {:.2?}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
async fn dial_sync_peer(&self, node_id: &NodeId) -> Result<PeerConnection, HorizonSyncError> {
let timer = Instant::now();
debug!(target: LOG_TARGET, "Dialing {} sync peer", node_id);
let conn = self.connectivity.dial_peer(node_id.clone()).await?;
let conn = self.connectivity.dial_peer(node_id.clone(), false).await?;
info!(
target: LOG_TARGET,
"Successfully dialed sync peer {} in {:.2?}",
Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/tests/tests/node_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ async fn propagate_and_forward_invalid_block() {
alice_node
.comms
.connectivity()
.dial_peer(bob_node.node_identity.node_id().clone())
.dial_peer(bob_node.node_identity.node_id().clone(), false)
.await
.unwrap();
wait_until_online(&[&alice_node, &bob_node, &carol_node, &dan_node]).await;
Expand Down
10 changes: 7 additions & 3 deletions base_layer/wallet/src/connectivity_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ impl WalletConnectivityService {
}

async fn try_setup_rpc_pool(&mut self, peer_node_id: NodeId) -> Result<bool, WalletConnectivityError> {
let conn = match self.try_dial_peer(peer_node_id.clone()).await? {
let conn = match self.try_dial_peer(peer_node_id.clone(), true).await? {
Some(c) => c,
None => {
warn!(target: LOG_TARGET, "Could not dial base node peer '{}'", peer_node_id);
Expand All @@ -413,14 +413,18 @@ impl WalletConnectivityService {
Ok(true)
}

async fn try_dial_peer(&mut self, peer: NodeId) -> Result<Option<PeerConnection>, WalletConnectivityError> {
async fn try_dial_peer(
&mut self,
peer: NodeId,
drop_old_connections: bool,
) -> Result<Option<PeerConnection>, WalletConnectivityError> {
tokio::select! {
biased;

_ = self.base_node_watch_receiver.changed() => {
Ok(None)
}
result = self.connectivity.dial_peer(peer) => {
result = self.connectivity.dial_peer(peer, drop_old_connections) => {
Ok(Some(result?))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ where
target: LOG_TARGET,
"Attempting UTXO sync with seed peer {} ({})", self.peer_index, peer,
);
match self.resources.comms_connectivity.dial_peer(peer.clone()).await {
match self.resources.comms_connectivity.dial_peer(peer.clone(), true).await {
Ok(conn) => Ok(conn),
Err(e) => {
self.publish_event(UtxoScannerEvent::ConnectionFailedToBaseNode {
Expand Down
8 changes: 4 additions & 4 deletions base_layer/wallet/tests/transaction_service_tests/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ async fn manage_single_transaction() {

let _peer_connection = bob_comms
.connectivity()
.dial_peer(alice_node_identity.node_id().clone())
.dial_peer(alice_node_identity.node_id().clone(), false)
.await
.unwrap();

Expand Down Expand Up @@ -753,7 +753,7 @@ async fn large_interactive_transaction() {
// Verify that Alice and Bob are connected
let _peer_connection = bob_comms
.connectivity()
.dial_peer(alice_node_identity.node_id().clone())
.dial_peer(alice_node_identity.node_id().clone(), false)
.await
.unwrap();

Expand Down Expand Up @@ -2172,15 +2172,15 @@ async fn manage_multiple_transactions() {

let _peer_connection = bob_comms
.connectivity()
.dial_peer(alice_node_identity.node_id().clone())
.dial_peer(alice_node_identity.node_id().clone(), false)
.await
.unwrap();
sleep(Duration::from_secs(3)).await;

// Connect alice to carol
let _peer_connection = alice_comms
.connectivity()
.dial_peer(carol_node_identity.node_id().clone())
.dial_peer(carol_node_identity.node_id().clone(), false)
.await
.unwrap();

Expand Down
6 changes: 3 additions & 3 deletions base_layer/wallet_ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12731,7 +12731,7 @@ mod test {
.block_on(
alice_wallet_comms
.connectivity()
.dial_peer(bob_node_identity.node_id().clone()),
.dial_peer(bob_node_identity.node_id().clone(), false),
)
.is_ok();
}
Expand All @@ -12740,7 +12740,7 @@ mod test {
.block_on(
bob_wallet_comms
.connectivity()
.dial_peer(alice_node_identity.node_id().clone()),
.dial_peer(alice_node_identity.node_id().clone(), false),
)
.is_ok();
}
Expand Down Expand Up @@ -12811,7 +12811,7 @@ mod test {
let bob_comms_dial_peer = bob_wallet_runtime.block_on(
bob_wallet_comms
.connectivity()
.dial_peer(alice_node_identity.node_id().clone()),
.dial_peer(alice_node_identity.node_id().clone(), false),
);
if let Ok(mut connection_to_alice) = bob_comms_dial_peer {
if bob_wallet_runtime
Expand Down
2 changes: 1 addition & 1 deletion comms/core/examples/stress/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ impl StressTestService {
self.comms_node.peer_manager().add_peer(peer).await?;
println!("Dialing peer `{}`...", node_id.short_str());
let start = Instant::now();
let conn = self.comms_node.connectivity().dial_peer(node_id).await?;
let conn = self.comms_node.connectivity().dial_peer(node_id, false).await?;
println!("Dial completed successfully in {:.2?}", start.elapsed());
let outbound_tx = self.outbound_tx.clone();
let inbound_rx = self.inbound_rx.clone();
Expand Down
4 changes: 2 additions & 2 deletions comms/core/src/builder/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ async fn peer_to_peer_custom_protocols() {
let mut conn_man_events2 = comms_node2.subscribe_connection_manager_events();

let mut conn1 = conn_man_requester1
.dial_peer(node_identity2.node_id().clone())
.dial_peer(node_identity2.node_id().clone(), false)
.await
.unwrap();

Expand Down Expand Up @@ -347,7 +347,7 @@ async fn peer_to_peer_messaging_simultaneous() {

comms_node1
.connectivity()
.dial_peer(comms_node2.node_identity().node_id().clone())
.dial_peer(comms_node2.node_identity().node_id().clone(), false)
.await
.unwrap();
// Simultaneously send messages between the two nodes
Expand Down
9 changes: 7 additions & 2 deletions comms/core/src/connection_manager/dialer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ pub(crate) enum DialerRequest {
Dial(
Box<Peer>,
Option<oneshot::Sender<Result<PeerConnection, ConnectionManagerError>>>,
bool,
),
CancelPendingDial(NodeId),
NotifyNewInboundConnection(Box<PeerConnection>),
Expand Down Expand Up @@ -176,8 +177,8 @@ where
debug!(target: LOG_TARGET, "Connection dialer got request: {:?}", request);

match request {
Dial(peer, reply_tx) => {
self.handle_dial_peer_request(pending_dials, peer, reply_tx);
Dial(peer, reply_tx, drop_old_connections) => {
self.handle_dial_peer_request(pending_dials, peer, reply_tx, drop_old_connections);
},
CancelPendingDial(peer_id) => {
self.cancel_dial(&peer_id);
Expand Down Expand Up @@ -318,6 +319,7 @@ where
pending_dials: &mut DialFuturesUnordered,
peer: Box<Peer>,
reply_tx: Option<oneshot::Sender<Result<PeerConnection, ConnectionManagerError>>>,
drop_old_connections: bool,
) {
if self.is_pending_dial(&peer.node_id) {
debug!(
Expand Down Expand Up @@ -371,6 +373,7 @@ where
let result = Self::perform_socket_upgrade_procedure(
&peer_manager,
&node_identity,
drop_old_connections,
socket,
addr.clone(),
authenticated_public_key,
Expand Down Expand Up @@ -421,6 +424,7 @@ where
async fn perform_socket_upgrade_procedure(
peer_manager: &PeerManager,
node_identity: &NodeIdentity,
drop_old_connections: bool,
mut socket: NoiseSocket<TTransport::Output>,
dialed_addr: Multiaddr,
authenticated_public_key: CommsPublicKey,
Expand Down Expand Up @@ -474,6 +478,7 @@ where
muxer,
dialed_addr,
NodeId::from_public_key(&authenticated_public_key),
drop_old_connections,
peer_identity.claim.features,
CONNECTION_DIRECTION,
conn_man_notifier,
Expand Down
1 change: 1 addition & 0 deletions comms/core/src/connection_manager/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ where
muxer,
peer_addr,
peer.node_id.clone(),
false,
peer.features,
CONNECTION_DIRECTION,
conn_man_notifier,
Expand Down
13 changes: 10 additions & 3 deletions comms/core/src/connection_manager/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,11 +385,17 @@ where
use ConnectionManagerRequest::{CancelDial, DialPeer, NotifyListening};
trace!(target: LOG_TARGET, "Connection manager got request: {:?}", request);
match request {
DialPeer { node_id, reply_tx } => {
DialPeer {
node_id,
reply_tx,
drop_old_connections,
} => {
let tracing_id = tracing::Span::current().id();
let span = span!(Level::TRACE, "connection_manager::handle_request");
span.follows_from(tracing_id);
self.dial_peer(node_id, reply_tx).instrument(span).await
self.dial_peer(node_id, reply_tx, drop_old_connections)
.instrument(span)
.await
},
CancelDial(node_id) => {
if let Err(err) = self.dialer_tx.send(DialerRequest::CancelPendingDial(node_id)).await {
Expand Down Expand Up @@ -500,10 +506,11 @@ where
&mut self,
node_id: NodeId,
reply: Option<oneshot::Sender<Result<PeerConnection, ConnectionManagerError>>>,
drop_old_connections: bool,
) {
match self.peer_manager.find_by_node_id(&node_id).await {
Ok(Some(peer)) => {
self.send_dialer_request(DialerRequest::Dial(Box::new(peer), reply))
self.send_dialer_request(DialerRequest::Dial(Box::new(peer), reply, drop_old_connections))
.await;
},
Ok(None) => {
Expand Down
11 changes: 8 additions & 3 deletions comms/core/src/connection_manager/peer_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ pub fn create(
connection: Yamux,
peer_addr: Multiaddr,
peer_node_id: NodeId,
drop_old_connections: bool,
peer_features: PeerFeatures,
direction: ConnectionDirection,
event_notifier: mpsc::Sender<ConnectionManagerEvent>,
Expand All @@ -91,6 +92,7 @@ pub fn create(
id,
peer_tx,
peer_node_id.clone(),
drop_old_connections,
peer_features,
peer_addr,
direction,
Expand Down Expand Up @@ -131,6 +133,7 @@ pub type ConnectionId = usize;
pub struct PeerConnection {
id: ConnectionId,
peer_node_id: NodeId,
drop_old_connections: bool,
peer_features: PeerFeatures,
request_tx: mpsc::Sender<PeerConnectionRequest>,
address: Arc<Multiaddr>,
Expand All @@ -148,6 +151,7 @@ impl PeerConnection {
id: ConnectionId,
request_tx: mpsc::Sender<PeerConnectionRequest>,
peer_node_id: NodeId,
drop_old_connections: bool,
peer_features: PeerFeatures,
address: Multiaddr,
direction: ConnectionDirection,
Expand All @@ -157,6 +161,7 @@ impl PeerConnection {
id,
request_tx,
peer_node_id,
drop_old_connections,
peer_features,
address: Arc::new(address),
direction,
Expand Down Expand Up @@ -256,15 +261,15 @@ impl PeerConnection {
let protocol = ProtocolId::from_static(T::PROTOCOL_NAME);
debug!(
target: LOG_TARGET,
"Attempting to establish RPC protocol `{}` to peer `{}`",
String::from_utf8_lossy(&protocol),
self.peer_node_id
"Attempting to establish RPC protocol `{}` to peer `{}` (drop_old_connections {})",
String::from_utf8_lossy(&protocol), self.peer_node_id, self.drop_old_connections
);
let framed = self.open_framed_substream(&protocol, RPC_MAX_FRAME_SIZE).await?;

let rpc_client = builder
.with_protocol_id(protocol)
.with_node_id(self.peer_node_id.clone())
.with_drop_old_connections(self.drop_old_connections)
.with_terminate_signal(self.drop_notifier.to_signal())
.connect(framed)
.await?;
Expand Down
16 changes: 14 additions & 2 deletions comms/core/src/connection_manager/requester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@

use std::sync::Arc;

use log::trace;
use tokio::sync::{broadcast, mpsc, oneshot};

use super::{error::ConnectionManagerError, peer_connection::PeerConnection};
use crate::{
connection_manager::manager::{ConnectionManagerEvent, ListenerInfo},
peer_manager::NodeId,
};
const LOG_TARGET: &str = "comms::connectivity::manager::requester";

/// Requests which are handled by the ConnectionManagerService
#[derive(Debug)]
Expand All @@ -37,6 +39,7 @@ pub enum ConnectionManagerRequest {
DialPeer {
node_id: NodeId,
reply_tx: Option<oneshot::Sender<Result<PeerConnection, ConnectionManagerError>>>,
drop_old_connections: bool,
},
/// Cancels a pending dial if one exists
CancelDial(NodeId),
Expand Down Expand Up @@ -77,7 +80,7 @@ impl ConnectionManagerRequester {
/// Attempt to connect to a remote peer
pub async fn dial_peer(&mut self, node_id: NodeId) -> Result<PeerConnection, ConnectionManagerError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.send_dial_peer(node_id, Some(reply_tx)).await?;
self.send_dial_peer(node_id, Some(reply_tx), false).await?;
reply_rx
.await
.map_err(|_| ConnectionManagerError::ActorRequestCanceled)?
Expand All @@ -97,9 +100,18 @@ impl ConnectionManagerRequester {
&mut self,
node_id: NodeId,
reply_tx: Option<oneshot::Sender<Result<PeerConnection, ConnectionManagerError>>>,
drop_old_connections: bool,
) -> Result<(), ConnectionManagerError> {
trace!(
target: LOG_TARGET, "send_dial_peer: peer: {}, drop_old_connections: {}",
node_id.short_str(), drop_old_connections
);
self.sender
.send(ConnectionManagerRequest::DialPeer { node_id, reply_tx })
.send(ConnectionManagerRequest::DialPeer {
node_id,
reply_tx,
drop_old_connections,
})
.await
.map_err(|_| ConnectionManagerError::SendToActorFailed)?;
Ok(())
Expand Down
Loading

0 comments on commit 3730921

Please sign in to comment.