From 9c61b0533d5d140d089d09e5be443a3f3be6de25 Mon Sep 17 00:00:00 2001 From: muXxer Date: Tue, 26 Mar 2024 18:29:13 +0100 Subject: [PATCH] Do not wait until peer is connected on `addPeer` --- components/p2p/component.go | 2 +- components/restapi/management/peers.go | 102 +++++++++--------- pkg/network/manager.go | 25 +++-- pkg/network/neighbor.go | 2 +- pkg/network/p2p/autopeering/autopeering.go | 2 +- pkg/network/p2p/manager.go | 30 +++--- .../p2p/manualpeering/manualpeering.go | 98 +++++------------ pkg/network/peer.go | 6 +- 8 files changed, 115 insertions(+), 152 deletions(-) diff --git a/components/p2p/component.go b/components/p2p/component.go index d241682c0..e6d7fbfc1 100644 --- a/components/p2p/component.go +++ b/components/p2p/component.go @@ -352,7 +352,7 @@ func connectConfigKnownPeers() { Component.LogPanicf("invalid peer address info: %s", err) } - if err := deps.NetworkManager.AddManualPeers(multiAddr); err != nil { + if _, err := deps.NetworkManager.AddManualPeer(multiAddr); err != nil { Component.LogInfof("failed to add peer: %s, error: %s", multiAddr.String(), err) } } diff --git a/components/restapi/management/peers.go b/components/restapi/management/peers.go index c1f9cbbcd..86462cff5 100644 --- a/components/restapi/management/peers.go +++ b/components/restapi/management/peers.go @@ -1,9 +1,6 @@ package management import ( - "context" - "time" - "github.com/labstack/echo/v4" "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" @@ -32,8 +29,47 @@ func parsePeerIDParam(c echo.Context) (peer.ID, error) { return peerID, nil } -// getPeerInfo returns the peer info for the given neighbor. -func getPeerInfo(neighbor network.Neighbor) *api.PeerInfo { +// getPeerInfoFromNeighbor returns the peer info for the given neighbor. +func getPeerInfoFromPeer(peer *network.Peer) *api.PeerInfo { + multiAddresses := make([]iotago.PrefixedStringUint8, len(peer.PeerAddresses)) + for i, multiAddress := range peer.PeerAddresses { + multiAddresses[i] = iotago.PrefixedStringUint8(multiAddress.String()) + } + + var alias string + relation := PeerRelationAutopeered + + if peerConfigItem := deps.PeeringConfigManager.Peer(peer.ID); peerConfigItem != nil { + alias = peerConfigItem.Alias + + // if the peer exists in the config, it is a manual peered peer + relation = PeerRelationManual + } + + packetsReceived := uint32(0) + packetsSent := uint32(0) + + neighbor, err := deps.NetworkManager.Neighbor(peer.ID) + if err == nil { + packetsReceived = uint32(neighbor.PacketsRead()) + packetsSent = uint32(neighbor.PacketsWritten()) + } + + return &api.PeerInfo{ + ID: peer.ID.String(), + MultiAddresses: multiAddresses, + Alias: alias, + Relation: relation, + Connected: peer.ConnStatus.Load() == network.ConnStatusConnected, + GossipMetrics: &api.PeerGossipMetrics{ + PacketsReceived: packetsReceived, + PacketsSent: packetsSent, + }, + } +} + +// getPeerInfoFromNeighbor returns the peer info for the given neighbor. +func getPeerInfoFromNeighbor(neighbor network.Neighbor) *api.PeerInfo { peer := neighbor.Peer() multiAddresses := make([]iotago.PrefixedStringUint8, len(peer.PeerAddresses)) @@ -80,7 +116,7 @@ func getPeer(c echo.Context) (*api.PeerInfo, error) { return nil, ierrors.WithMessagef(echo.ErrInternalServerError, "failed to get peer: %w", err) } - return getPeerInfo(neighbor), nil + return getPeerInfoFromNeighbor(neighbor), nil } // removePeer drops the connection to the peer with the given peerID and removes it from the known peers. @@ -93,7 +129,7 @@ func removePeer(c echo.Context) error { // error is ignored because we don't care about the config here _ = deps.PeeringConfigManager.RemovePeer(peerID) - return deps.NetworkManager.RemoveNeighbor(peerID) + return deps.NetworkManager.RemovePeer(peerID) } // listPeers returns the list of all peers. @@ -105,13 +141,13 @@ func listPeers() *api.PeersResponse { } for i, info := range allNeighbors { - result.Peers[i] = getPeerInfo(info) + result.Peers[i] = getPeerInfoFromNeighbor(info) } return result } -// addPeer tries to establish a connection to the given peer and adds it to the known peers. +// addPeer adds the peer with the given multiAddress to the manual peering layer. func addPeer(c echo.Context) (*api.PeerInfo, error) { request := &api.AddPeerRequest{} @@ -124,52 +160,14 @@ func addPeer(c echo.Context) (*api.PeerInfo, error) { return nil, ierrors.WithMessagef(httpserver.ErrInvalidParameter, "invalid multiAddress (%s): %w", request.MultiAddress, err) } - addrInfo, err := peer.AddrInfoFromP2pAddr(multiAddr) + _, err = peer.AddrInfoFromP2pAddr(multiAddr) if err != nil { return nil, ierrors.WithMessagef(httpserver.ErrInvalidParameter, "invalid address info from multiAddress (%s): %w", request.MultiAddress, err) } - // we don't need to add the peer if it is already marked as a manual peer - if deps.NetworkManager.ManualNeighborExists(addrInfo.ID) { - return nil, ierrors.WithMessagef(echo.ErrBadRequest, "manual peer already exists, peerID: %s", addrInfo.ID.String()) - } - - connectedCtx, connectedCtxCancel := context.WithTimeout(c.Request().Context(), 5*time.Second) - defer connectedCtxCancel() - - // hook to the event so we wait until the peer is connected - unhook := deps.NetworkManager.OnNeighborAdded(func(neighbor network.Neighbor) { - if neighbor.Peer().ID == addrInfo.ID { - // cancel the context to stop waiting - connectedCtxCancel() - } - }).Unhook - defer unhook() - - // if the peer was already connected, we don't need to wait for it, but we still want to add - // it to the manual peers. - if deps.NetworkManager.NeighborExists(addrInfo.ID) { - connectedCtxCancel() - } - - if err := deps.NetworkManager.AddManualPeers(multiAddr); err != nil { - return nil, ierrors.WithMessagef(echo.ErrInternalServerError, "failed to add peer: %w", err) - } - - // wait for the peer to be added or the context to be done - <-connectedCtx.Done() - if ierrors.Is(connectedCtx.Err(), context.DeadlineExceeded) { - return nil, ierrors.WithMessagef(echo.ErrInternalServerError, "failed to add peer: timeout") - } - - peerID := addrInfo.ID - neighbor, err := deps.NetworkManager.Neighbor(peerID) + peer, err := deps.NetworkManager.AddManualPeer(multiAddr) if err != nil { - if ierrors.Is(err, network.ErrUnknownPeer) { - return nil, ierrors.WithMessagef(echo.ErrNotFound, "peer not found, peerID: %s", peerID.String()) - } - - return nil, ierrors.WithMessagef(echo.ErrInternalServerError, "failed to get peer: %w", err) + return nil, ierrors.WithMessagef(echo.ErrInternalServerError, "failed to add peer: %w", err) } var alias string @@ -179,8 +177,8 @@ func addPeer(c echo.Context) (*api.PeerInfo, error) { // error is ignored because we don't care about the config here if err := deps.PeeringConfigManager.AddPeer(multiAddr, alias); err != nil { - Component.LogWarnf("failed to add peer to config, peerID: %s, err: %s", peerID.String(), err.Error()) + Component.LogWarnf("failed to add peer to config, peerID: %s, err: %s", peer.ID.String(), err.Error()) } - return getPeerInfo(neighbor), nil + return getPeerInfoFromPeer(peer), nil } diff --git a/pkg/network/manager.go b/pkg/network/manager.go index cbbb3c1b0..0489f39e3 100644 --- a/pkg/network/manager.go +++ b/pkg/network/manager.go @@ -10,29 +10,38 @@ import ( "github.com/iotaledger/hive.go/runtime/event" ) +// Manager is the network manager interface. +// Peer is a known node in the network. +// Neighbor is a Peer with an established connection in the gossip layer. type Manager interface { Endpoint + // DialPeer connects to a peer. DialPeer(ctx context.Context, peer *Peer) error + // RemovePeer disconnects the peer with the given ID + // and removes it from manual peering in case it was added manually. + RemovePeer(peerID peer.ID) error + // AddManualPeer adds a manual peer to the list of known peers. + AddManualPeer(multiAddress multiaddr.Multiaddr) (*Peer, error) + // GetManualPeers returns all the manual peers. + GetManualPeers(onlyConnected ...bool) []*Peer + // OnNeighborAdded registers a callback that gets triggered when a neighbor is added. OnNeighborAdded(handler func(Neighbor)) *event.Hook[func(Neighbor)] + // OnNeighborRemoved registers a callback that gets triggered when a neighbor is removed. OnNeighborRemoved(handler func(Neighbor)) *event.Hook[func(Neighbor)] // Neighbor returns the neighbor with the given ID. Neighbor(peerID peer.ID) (Neighbor, error) // NeighborExists checks if a neighbor with the given ID exists. NeighborExists(peerID peer.ID) bool - // ManualNeighborExists checks if a neighbor with the given ID exists in the manual peering layer. - ManualNeighborExists(peerID peer.ID) bool - // RemoveNeighbor disconnects the neighbor with the given ID - // and removes it from manual peering in case it was added manually. - RemoveNeighbor(peerID peer.ID) error - // DropNeighbor disconnects the neighbor with the given ID. - DropNeighbor(peerID peer.ID) error + // DisconnectNeighbor disconnects the neighbor with the given ID. + DisconnectNeighbor(peerID peer.ID) error + // AllNeighbors returns all the neighbors that are currently connected. AllNeighbors() []Neighbor + // AutopeeringNeighbors returns all the neighbors that are currently connected via autopeering. AutopeeringNeighbors() []Neighbor - AddManualPeers(multiAddresses ...multiaddr.Multiaddr) error P2PHost() host.Host diff --git a/pkg/network/neighbor.go b/pkg/network/neighbor.go index 1f0aa6da4..4591a7848 100644 --- a/pkg/network/neighbor.go +++ b/pkg/network/neighbor.go @@ -1,6 +1,6 @@ package network -// Neighbor is a Peer with an active connection. +// Neighbor is a Peer with an established connection in the gossip layer. type Neighbor interface { Peer() *Peer PacketsRead() uint64 diff --git a/pkg/network/p2p/autopeering/autopeering.go b/pkg/network/p2p/autopeering/autopeering.go index 0111dfe82..41976e1c2 100644 --- a/pkg/network/p2p/autopeering/autopeering.go +++ b/pkg/network/p2p/autopeering/autopeering.go @@ -214,7 +214,7 @@ func (m *Manager) discoverAndDialPeers() { neighborsToDrop := randomSubset(autopeeringNeighbors, -peersToFind) m.logger.LogDebugf("Too many autopeering neighbors connected %d, disconnecting some", len(neighborsToDrop)) for _, peer := range neighborsToDrop { - if err := m.networkManager.DropNeighbor(peer.Peer().ID); err != nil { + if err := m.networkManager.DisconnectNeighbor(peer.Peer().ID); err != nil { m.logger.LogDebugf("Failed to disconnect neighbor %s", peer.Peer().ID) } } diff --git a/pkg/network/p2p/manager.go b/pkg/network/p2p/manager.go index 938a61713..7a312d6b5 100644 --- a/pkg/network/p2p/manager.go +++ b/pkg/network/p2p/manager.go @@ -112,6 +112,7 @@ func (m *Manager) DialPeer(ctx context.Context, peer *network.Peer) error { return ierrors.New("no protocol handler registered to dial peer") } + // Do not try to dial already connected peers. if m.NeighborExists(peer.ID) { return ierrors.WithMessagef(network.ErrDuplicatePeer, "peer %s already exists", peer.ID.String()) } @@ -193,8 +194,12 @@ func (m *Manager) Shutdown() { } } -func (m *Manager) AddManualPeers(peers ...multiaddr.Multiaddr) error { - return m.manualPeering.AddPeers(peers...) +func (m *Manager) AddManualPeer(multiAddr multiaddr.Multiaddr) (*network.Peer, error) { + return m.manualPeering.AddPeer(multiAddr) +} + +func (m *Manager) GetManualPeers(onlyConnected ...bool) []*network.Peer { + return m.manualPeering.GetPeers(onlyConnected...) } // LocalPeerID returns the local peer ID. @@ -207,11 +212,11 @@ func (m *Manager) P2PHost() host.Host { return m.libp2pHost } -// RemoveNeighbor disconnects the neighbor with the given ID +// RemovePeer disconnects the neighbor with the given ID // and removes it from manual peering in case it was added manually. -func (m *Manager) RemoveNeighbor(id peer.ID) error { +func (m *Manager) RemovePeer(id peer.ID) error { if m.manualPeering.IsPeerKnown(id) { - // RemovePeer calls DropNeighbor internally + // RemovePeer calls DisconnectNeighbor internally if err := m.manualPeering.RemovePeer(id); err != nil { return err } @@ -219,15 +224,15 @@ func (m *Manager) RemoveNeighbor(id peer.ID) error { return nil } - if err := m.DropNeighbor(id); err != nil && !ierrors.Is(err, network.ErrUnknownPeer) { + if err := m.DisconnectNeighbor(id); err != nil && !ierrors.Is(err, network.ErrUnknownPeer) { return ierrors.Wrapf(err, "failed to drop peer %s in the gossip layer", id.String()) } return nil } -// DropNeighbor disconnects the neighbor with the given ID. -func (m *Manager) DropNeighbor(id peer.ID) error { +// DisconnectNeighbor disconnects the neighbor with the given ID. +func (m *Manager) DisconnectNeighbor(id peer.ID) error { nbr, err := m.neighbor(id) if err != nil { return ierrors.WithStack(err) @@ -251,6 +256,7 @@ func (m *Manager) Send(packet proto.Message, to ...peer.ID) { } } +// AllNeighbors returns all the neighbors that are currently connected. func (m *Manager) AllNeighbors() []network.Neighbor { neighbors := m.allNeighbors() result := make([]network.Neighbor, len(neighbors)) @@ -266,6 +272,7 @@ func (m *Manager) allNeighbors() []*neighbor { return m.neighbors.Values() } +// AutopeeringNeighbors returns all the neighbors that are currently connected via autopeering. func (m *Manager) AutopeeringNeighbors() []network.Neighbor { return lo.Filter(m.AllNeighbors(), func(n network.Neighbor) bool { return !m.manualPeering.IsPeerKnown(n.Peer().ID) @@ -369,11 +376,14 @@ func (m *Manager) addNeighbor(ctx context.Context, peer *network.Peer, ps *Packe if peer.ID == m.libp2pHost.ID() { return ierrors.WithStack(network.ErrLoopbackPeer) } + m.shutdownMutex.RLock() defer m.shutdownMutex.RUnlock() + if m.isShutdown { return network.ErrNotRunning } + if m.NeighborExists(peer.ID) { return ierrors.WithStack(network.ErrDuplicatePeer) } @@ -428,10 +438,6 @@ func (m *Manager) NeighborExists(id peer.ID) bool { return m.neighbors.Has(id) } -func (m *Manager) ManualNeighborExists(id peer.ID) bool { - return m.manualPeering.IsPeerKnown(id) -} - func (m *Manager) deleteNeighbor(nbr *neighbor) { // Close the connection to the peer. _ = m.libp2pHost.Network().ClosePeer(nbr.Peer().ID) diff --git a/pkg/network/p2p/manualpeering/manualpeering.go b/pkg/network/p2p/manualpeering/manualpeering.go index cbc547bec..ee7950421 100644 --- a/pkg/network/p2p/manualpeering/manualpeering.go +++ b/pkg/network/p2p/manualpeering/manualpeering.go @@ -56,18 +56,6 @@ func NewManager(networkManager network.Manager, logger log.Logger) *Manager { return m } -// AddPeers adds multiple peers to the list of known peers. -func (m *Manager) AddPeers(peerAddrs ...multiaddr.Multiaddr) error { - var resultErr error - for _, peerAddr := range peerAddrs { - if err := m.addPeer(peerAddr); err != nil { - resultErr = err - } - } - - return resultErr -} - // RemovePeer removes a peer from the list of known peers. func (m *Manager) RemovePeer(peerID peer.ID) error { m.knownPeersMutex.Lock() @@ -87,62 +75,26 @@ func (m *Manager) RemovePeer(peerID peer.ID) error { m.networkManager.P2PHost().ConnManager().Unprotect(peerID, manualPeerProtectionTag) - if err := m.networkManager.DropNeighbor(peerID); err != nil && !ierrors.Is(err, network.ErrUnknownPeer) { + if err := m.networkManager.DisconnectNeighbor(peerID); err != nil && !ierrors.Is(err, network.ErrUnknownPeer) { return ierrors.Wrapf(err, "failed to drop known peer %s in the gossip layer", peerID.String()) } return nil } -// GetPeersConfig holds optional parameters for the GetPeers method. -type GetPeersConfig struct { - // If true, GetPeers returns peers that have actual connection established in the gossip layer. - OnlyConnected bool `json:"onlyConnected"` -} - -// GetPeersOption defines a single option for GetPeers method. -type GetPeersOption func(conf *GetPeersConfig) - -// BuildGetPeersConfig builds GetPeersConfig struct from a list of options. -func BuildGetPeersConfig(opts []GetPeersOption) *GetPeersConfig { - conf := &GetPeersConfig{} - for _, o := range opts { - o(conf) - } - - return conf -} - -// ToOptions translates config struct to a list of corresponding options. -func (c *GetPeersConfig) ToOptions() (opts []GetPeersOption) { - if c.OnlyConnected { - opts = append(opts, WithOnlyConnectedPeers()) - } - - return opts -} - -// WithOnlyConnectedPeers returns a GetPeersOption that sets OnlyConnected field to true. -func WithOnlyConnectedPeers() GetPeersOption { - return func(conf *GetPeersConfig) { - conf.OnlyConnected = true - } -} - // GetPeers returns the list of known peers. -func (m *Manager) GetPeers(opts ...GetPeersOption) []*network.PeerDescriptor { - conf := BuildGetPeersConfig(opts) +func (m *Manager) GetPeers(onlyConnected ...bool) []*network.Peer { m.knownPeersMutex.RLock() defer m.knownPeersMutex.RUnlock() - peers := make([]*network.PeerDescriptor, 0, len(m.knownPeers)) - for _, kp := range m.knownPeers { - connStatus := kp.GetConnStatus() - if !conf.OnlyConnected || connStatus == network.ConnStatusConnected { - peers = append(peers, &network.PeerDescriptor{ - Addresses: kp.PeerAddresses, - }) + peers := make([]*network.Peer, 0, len(m.knownPeers)) + for _, peer := range m.knownPeers { + if len(onlyConnected) > 0 && onlyConnected[0] && peer.GetConnStatus() != network.ConnStatusConnected { + // skip disconnected peers if onlyConnected is true + continue } + + peers = append(peers, peer) } return peers @@ -189,39 +141,41 @@ func (m *Manager) IsPeerKnown(id peer.ID) bool { return exists } -func (m *Manager) addPeer(peerAddr multiaddr.Multiaddr) error { +func (m *Manager) AddPeer(multiAddr multiaddr.Multiaddr) (*network.Peer, error) { if !m.isStarted.Load() { - return ierrors.New("manual peering manager hasn't been started yet") + return nil, ierrors.New("manual peering manager hasn't been started yet") } if m.isStopped { - return ierrors.New("manual peering manager was stopped") + return nil, ierrors.New("manual peering manager was stopped") } m.knownPeersMutex.Lock() defer m.knownPeersMutex.Unlock() - p, err := network.NewPeerFromMultiAddr(peerAddr) + newPeer, err := network.NewPeerFromMultiAddr(multiAddr) if err != nil { - return ierrors.WithStack(err) + return nil, ierrors.WithStack(err) } - // Do not add self - if p.ID == m.networkManager.P2PHost().ID() { - return ierrors.New("not adding self to the list of known peers") + // do not add ourselves to the list of known peers + if newPeer.ID == m.networkManager.P2PHost().ID() { + return nil, ierrors.New("not adding self to the list of known peers") } - if _, exists := m.knownPeers[p.ID]; exists { - return nil + if peer, exists := m.knownPeers[newPeer.ID]; exists { + return peer, nil } - m.logger.LogInfof("Adding new peer to the list of known peers in manual peering %s", p) - m.knownPeers[p.ID] = p + + m.logger.LogInfof("Adding new peer to the list of known peers in manual peering %s", newPeer) + m.knownPeers[newPeer.ID] = newPeer + go func() { - defer close(p.DoneCh) - m.keepPeerConnected(p) + defer close(newPeer.DoneCh) + m.keepPeerConnected(newPeer) }() - return nil + return newPeer, nil } func (m *Manager) removeAllKnownPeers() error { diff --git a/pkg/network/peer.go b/pkg/network/peer.go index e40e52224..5d32b9241 100644 --- a/pkg/network/peer.go +++ b/pkg/network/peer.go @@ -26,11 +26,7 @@ const ( ConnStatusConnected ConnectionStatus = "connected" ) -// PeerDescriptor defines a peer record in the manual peering layer. -type PeerDescriptor struct { - Addresses []multiaddr.Multiaddr `json:"addresses"` -} - +// Peer is a known node in the network. type Peer struct { ID peer.ID PublicKey ed25519.PublicKey