Skip to content

Commit

Permalink
Do not wait until peer is connected on addPeer
Browse files Browse the repository at this point in the history
  • Loading branch information
muXxer committed Mar 26, 2024
1 parent df5973f commit 9c61b05
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 152 deletions.
2 changes: 1 addition & 1 deletion components/p2p/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ func connectConfigKnownPeers() {
Component.LogPanicf("invalid peer address info: %s", err)
}

if err := deps.NetworkManager.AddManualPeers(multiAddr); err != nil {
if _, err := deps.NetworkManager.AddManualPeer(multiAddr); err != nil {
Component.LogInfof("failed to add peer: %s, error: %s", multiAddr.String(), err)
}
}
Expand Down
102 changes: 50 additions & 52 deletions components/restapi/management/peers.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package management

import (
"context"
"time"

"github.com/labstack/echo/v4"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
Expand Down Expand Up @@ -32,8 +29,47 @@ func parsePeerIDParam(c echo.Context) (peer.ID, error) {
return peerID, nil
}

// getPeerInfo returns the peer info for the given neighbor.
func getPeerInfo(neighbor network.Neighbor) *api.PeerInfo {
// getPeerInfoFromNeighbor returns the peer info for the given neighbor.
func getPeerInfoFromPeer(peer *network.Peer) *api.PeerInfo {
multiAddresses := make([]iotago.PrefixedStringUint8, len(peer.PeerAddresses))
for i, multiAddress := range peer.PeerAddresses {
multiAddresses[i] = iotago.PrefixedStringUint8(multiAddress.String())
}

var alias string
relation := PeerRelationAutopeered

if peerConfigItem := deps.PeeringConfigManager.Peer(peer.ID); peerConfigItem != nil {
alias = peerConfigItem.Alias

// if the peer exists in the config, it is a manual peered peer
relation = PeerRelationManual
}

packetsReceived := uint32(0)
packetsSent := uint32(0)

neighbor, err := deps.NetworkManager.Neighbor(peer.ID)
if err == nil {
packetsReceived = uint32(neighbor.PacketsRead())
packetsSent = uint32(neighbor.PacketsWritten())
}

return &api.PeerInfo{
ID: peer.ID.String(),
MultiAddresses: multiAddresses,
Alias: alias,
Relation: relation,
Connected: peer.ConnStatus.Load() == network.ConnStatusConnected,
GossipMetrics: &api.PeerGossipMetrics{
PacketsReceived: packetsReceived,
PacketsSent: packetsSent,
},
}
}

// getPeerInfoFromNeighbor returns the peer info for the given neighbor.
func getPeerInfoFromNeighbor(neighbor network.Neighbor) *api.PeerInfo {
peer := neighbor.Peer()

multiAddresses := make([]iotago.PrefixedStringUint8, len(peer.PeerAddresses))
Expand Down Expand Up @@ -80,7 +116,7 @@ func getPeer(c echo.Context) (*api.PeerInfo, error) {
return nil, ierrors.WithMessagef(echo.ErrInternalServerError, "failed to get peer: %w", err)
}

return getPeerInfo(neighbor), nil
return getPeerInfoFromNeighbor(neighbor), nil
}

// removePeer drops the connection to the peer with the given peerID and removes it from the known peers.
Expand All @@ -93,7 +129,7 @@ func removePeer(c echo.Context) error {
// error is ignored because we don't care about the config here
_ = deps.PeeringConfigManager.RemovePeer(peerID)

return deps.NetworkManager.RemoveNeighbor(peerID)
return deps.NetworkManager.RemovePeer(peerID)
}

// listPeers returns the list of all peers.
Expand All @@ -105,13 +141,13 @@ func listPeers() *api.PeersResponse {
}

for i, info := range allNeighbors {
result.Peers[i] = getPeerInfo(info)
result.Peers[i] = getPeerInfoFromNeighbor(info)
}

return result
}

// addPeer tries to establish a connection to the given peer and adds it to the known peers.
// addPeer adds the peer with the given multiAddress to the manual peering layer.
func addPeer(c echo.Context) (*api.PeerInfo, error) {
request := &api.AddPeerRequest{}

Expand All @@ -124,52 +160,14 @@ func addPeer(c echo.Context) (*api.PeerInfo, error) {
return nil, ierrors.WithMessagef(httpserver.ErrInvalidParameter, "invalid multiAddress (%s): %w", request.MultiAddress, err)
}

addrInfo, err := peer.AddrInfoFromP2pAddr(multiAddr)
_, err = peer.AddrInfoFromP2pAddr(multiAddr)
if err != nil {
return nil, ierrors.WithMessagef(httpserver.ErrInvalidParameter, "invalid address info from multiAddress (%s): %w", request.MultiAddress, err)
}

// we don't need to add the peer if it is already marked as a manual peer
if deps.NetworkManager.ManualNeighborExists(addrInfo.ID) {
return nil, ierrors.WithMessagef(echo.ErrBadRequest, "manual peer already exists, peerID: %s", addrInfo.ID.String())
}

connectedCtx, connectedCtxCancel := context.WithTimeout(c.Request().Context(), 5*time.Second)
defer connectedCtxCancel()

// hook to the event so we wait until the peer is connected
unhook := deps.NetworkManager.OnNeighborAdded(func(neighbor network.Neighbor) {
if neighbor.Peer().ID == addrInfo.ID {
// cancel the context to stop waiting
connectedCtxCancel()
}
}).Unhook
defer unhook()

// if the peer was already connected, we don't need to wait for it, but we still want to add
// it to the manual peers.
if deps.NetworkManager.NeighborExists(addrInfo.ID) {
connectedCtxCancel()
}

if err := deps.NetworkManager.AddManualPeers(multiAddr); err != nil {
return nil, ierrors.WithMessagef(echo.ErrInternalServerError, "failed to add peer: %w", err)
}

// wait for the peer to be added or the context to be done
<-connectedCtx.Done()
if ierrors.Is(connectedCtx.Err(), context.DeadlineExceeded) {
return nil, ierrors.WithMessagef(echo.ErrInternalServerError, "failed to add peer: timeout")
}

peerID := addrInfo.ID
neighbor, err := deps.NetworkManager.Neighbor(peerID)
peer, err := deps.NetworkManager.AddManualPeer(multiAddr)
if err != nil {
if ierrors.Is(err, network.ErrUnknownPeer) {
return nil, ierrors.WithMessagef(echo.ErrNotFound, "peer not found, peerID: %s", peerID.String())
}

return nil, ierrors.WithMessagef(echo.ErrInternalServerError, "failed to get peer: %w", err)
return nil, ierrors.WithMessagef(echo.ErrInternalServerError, "failed to add peer: %w", err)
}

var alias string
Expand All @@ -179,8 +177,8 @@ func addPeer(c echo.Context) (*api.PeerInfo, error) {

// error is ignored because we don't care about the config here
if err := deps.PeeringConfigManager.AddPeer(multiAddr, alias); err != nil {
Component.LogWarnf("failed to add peer to config, peerID: %s, err: %s", peerID.String(), err.Error())
Component.LogWarnf("failed to add peer to config, peerID: %s, err: %s", peer.ID.String(), err.Error())
}

return getPeerInfo(neighbor), nil
return getPeerInfoFromPeer(peer), nil
}
25 changes: 17 additions & 8 deletions pkg/network/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,38 @@ import (
"github.com/iotaledger/hive.go/runtime/event"
)

// Manager is the network manager interface.
// Peer is a known node in the network.
// Neighbor is a Peer with an established connection in the gossip layer.
type Manager interface {
Endpoint

// DialPeer connects to a peer.
DialPeer(ctx context.Context, peer *Peer) error
// RemovePeer disconnects the peer with the given ID
// and removes it from manual peering in case it was added manually.
RemovePeer(peerID peer.ID) error
// AddManualPeer adds a manual peer to the list of known peers.
AddManualPeer(multiAddress multiaddr.Multiaddr) (*Peer, error)
// GetManualPeers returns all the manual peers.
GetManualPeers(onlyConnected ...bool) []*Peer

// OnNeighborAdded registers a callback that gets triggered when a neighbor is added.
OnNeighborAdded(handler func(Neighbor)) *event.Hook[func(Neighbor)]
// OnNeighborRemoved registers a callback that gets triggered when a neighbor is removed.
OnNeighborRemoved(handler func(Neighbor)) *event.Hook[func(Neighbor)]

// Neighbor returns the neighbor with the given ID.
Neighbor(peerID peer.ID) (Neighbor, error)
// NeighborExists checks if a neighbor with the given ID exists.
NeighborExists(peerID peer.ID) bool
// ManualNeighborExists checks if a neighbor with the given ID exists in the manual peering layer.
ManualNeighborExists(peerID peer.ID) bool
// RemoveNeighbor disconnects the neighbor with the given ID
// and removes it from manual peering in case it was added manually.
RemoveNeighbor(peerID peer.ID) error
// DropNeighbor disconnects the neighbor with the given ID.
DropNeighbor(peerID peer.ID) error
// DisconnectNeighbor disconnects the neighbor with the given ID.
DisconnectNeighbor(peerID peer.ID) error

// AllNeighbors returns all the neighbors that are currently connected.
AllNeighbors() []Neighbor
// AutopeeringNeighbors returns all the neighbors that are currently connected via autopeering.
AutopeeringNeighbors() []Neighbor
AddManualPeers(multiAddresses ...multiaddr.Multiaddr) error

P2PHost() host.Host

Expand Down
2 changes: 1 addition & 1 deletion pkg/network/neighbor.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package network

// Neighbor is a Peer with an active connection.
// Neighbor is a Peer with an established connection in the gossip layer.
type Neighbor interface {
Peer() *Peer
PacketsRead() uint64
Expand Down
2 changes: 1 addition & 1 deletion pkg/network/p2p/autopeering/autopeering.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (m *Manager) discoverAndDialPeers() {
neighborsToDrop := randomSubset(autopeeringNeighbors, -peersToFind)
m.logger.LogDebugf("Too many autopeering neighbors connected %d, disconnecting some", len(neighborsToDrop))
for _, peer := range neighborsToDrop {
if err := m.networkManager.DropNeighbor(peer.Peer().ID); err != nil {
if err := m.networkManager.DisconnectNeighbor(peer.Peer().ID); err != nil {
m.logger.LogDebugf("Failed to disconnect neighbor %s", peer.Peer().ID)
}
}
Expand Down
30 changes: 18 additions & 12 deletions pkg/network/p2p/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func (m *Manager) DialPeer(ctx context.Context, peer *network.Peer) error {
return ierrors.New("no protocol handler registered to dial peer")
}

// Do not try to dial already connected peers.
if m.NeighborExists(peer.ID) {
return ierrors.WithMessagef(network.ErrDuplicatePeer, "peer %s already exists", peer.ID.String())
}
Expand Down Expand Up @@ -193,8 +194,12 @@ func (m *Manager) Shutdown() {
}
}

func (m *Manager) AddManualPeers(peers ...multiaddr.Multiaddr) error {
return m.manualPeering.AddPeers(peers...)
func (m *Manager) AddManualPeer(multiAddr multiaddr.Multiaddr) (*network.Peer, error) {
return m.manualPeering.AddPeer(multiAddr)
}

func (m *Manager) GetManualPeers(onlyConnected ...bool) []*network.Peer {
return m.manualPeering.GetPeers(onlyConnected...)
}

// LocalPeerID returns the local peer ID.
Expand All @@ -207,27 +212,27 @@ func (m *Manager) P2PHost() host.Host {
return m.libp2pHost
}

// RemoveNeighbor disconnects the neighbor with the given ID
// RemovePeer disconnects the neighbor with the given ID
// and removes it from manual peering in case it was added manually.
func (m *Manager) RemoveNeighbor(id peer.ID) error {
func (m *Manager) RemovePeer(id peer.ID) error {
if m.manualPeering.IsPeerKnown(id) {
// RemovePeer calls DropNeighbor internally
// RemovePeer calls DisconnectNeighbor internally
if err := m.manualPeering.RemovePeer(id); err != nil {
return err
}

return nil
}

if err := m.DropNeighbor(id); err != nil && !ierrors.Is(err, network.ErrUnknownPeer) {
if err := m.DisconnectNeighbor(id); err != nil && !ierrors.Is(err, network.ErrUnknownPeer) {
return ierrors.Wrapf(err, "failed to drop peer %s in the gossip layer", id.String())
}

return nil
}

// DropNeighbor disconnects the neighbor with the given ID.
func (m *Manager) DropNeighbor(id peer.ID) error {
// DisconnectNeighbor disconnects the neighbor with the given ID.
func (m *Manager) DisconnectNeighbor(id peer.ID) error {
nbr, err := m.neighbor(id)
if err != nil {
return ierrors.WithStack(err)
Expand All @@ -251,6 +256,7 @@ func (m *Manager) Send(packet proto.Message, to ...peer.ID) {
}
}

// AllNeighbors returns all the neighbors that are currently connected.
func (m *Manager) AllNeighbors() []network.Neighbor {
neighbors := m.allNeighbors()
result := make([]network.Neighbor, len(neighbors))
Expand All @@ -266,6 +272,7 @@ func (m *Manager) allNeighbors() []*neighbor {
return m.neighbors.Values()
}

// AutopeeringNeighbors returns all the neighbors that are currently connected via autopeering.
func (m *Manager) AutopeeringNeighbors() []network.Neighbor {
return lo.Filter(m.AllNeighbors(), func(n network.Neighbor) bool {
return !m.manualPeering.IsPeerKnown(n.Peer().ID)
Expand Down Expand Up @@ -369,11 +376,14 @@ func (m *Manager) addNeighbor(ctx context.Context, peer *network.Peer, ps *Packe
if peer.ID == m.libp2pHost.ID() {
return ierrors.WithStack(network.ErrLoopbackPeer)
}

m.shutdownMutex.RLock()
defer m.shutdownMutex.RUnlock()

if m.isShutdown {
return network.ErrNotRunning
}

if m.NeighborExists(peer.ID) {
return ierrors.WithStack(network.ErrDuplicatePeer)
}
Expand Down Expand Up @@ -428,10 +438,6 @@ func (m *Manager) NeighborExists(id peer.ID) bool {
return m.neighbors.Has(id)
}

func (m *Manager) ManualNeighborExists(id peer.ID) bool {
return m.manualPeering.IsPeerKnown(id)
}

func (m *Manager) deleteNeighbor(nbr *neighbor) {
// Close the connection to the peer.
_ = m.libp2pHost.Network().ClosePeer(nbr.Peer().ID)
Expand Down
Loading

0 comments on commit 9c61b05

Please sign in to comment.