Skip to content

Commit

Permalink
Merge pull request #875 from iotaledger/fix/peers-api
Browse files Browse the repository at this point in the history
Fix peers API
  • Loading branch information
muXxer authored Mar 27, 2024
2 parents a12e99a + 7f3cb3a commit 46dab61
Show file tree
Hide file tree
Showing 15 changed files with 168 additions and 130 deletions.
2 changes: 1 addition & 1 deletion components/dashboard/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func neighborMetrics() []neighbormetric {
}

// gossip plugin might be disabled
neighbors := deps.NetworkManager.AllNeighbors()
neighbors := deps.NetworkManager.Neighbors()
if neighbors == nil {
return []neighbormetric{}
}
Expand Down
2 changes: 1 addition & 1 deletion components/p2p/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ func connectConfigKnownPeers() {
Component.LogPanicf("invalid peer address info: %s", err)
}

if err := deps.NetworkManager.AddManualPeers(multiAddr); err != nil {
if _, err := deps.NetworkManager.AddManualPeer(multiAddr); err != nil {
Component.LogInfof("failed to add peer: %s, error: %s", multiAddr.String(), err)
}
}
Expand Down
90 changes: 56 additions & 34 deletions components/restapi/management/peers.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package management

import (
"sort"

"github.com/labstack/echo/v4"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
Expand All @@ -21,18 +23,16 @@ const (

// parsePeerIDParam parses the peerID parameter from the request.
func parsePeerIDParam(c echo.Context) (peer.ID, error) {
peerID, err := peer.Decode(c.Param("peerID"))
peerID, err := peer.Decode(c.Param(api.ParameterPeerID))
if err != nil {
return "", ierrors.WithMessagef(httpserver.ErrInvalidParameter, "invalid peerID: %w", err)
}

return peerID, nil
}

// getPeerInfo returns the peer info for the given neighbor.
func getPeerInfo(neighbor network.Neighbor) *api.PeerInfo {
peer := neighbor.Peer()

// getPeerInfoFromNeighbor returns the peer info for the given peer.
func getPeerInfoFromPeer(peer *network.Peer) *api.PeerInfo {
multiAddresses := make([]iotago.PrefixedStringUint8, len(peer.PeerAddresses))
for i, multiAddress := range peer.PeerAddresses {
multiAddresses[i] = iotago.PrefixedStringUint8(multiAddress.String())
Expand All @@ -41,22 +41,30 @@ func getPeerInfo(neighbor network.Neighbor) *api.PeerInfo {
var alias string
relation := PeerRelationAutopeered

if peerConfigItem := deps.PeeringConfigManager.Peer(neighbor.Peer().ID); peerConfigItem != nil {
if peerConfigItem := deps.PeeringConfigManager.Peer(peer.ID); peerConfigItem != nil {
alias = peerConfigItem.Alias

// if the peer exists in the config, it is a manual peered peer
relation = PeerRelationManual
}

packetsReceived := uint32(0)
packetsSent := uint32(0)

if neighbor, err := deps.NetworkManager.Neighbor(peer.ID); err == nil {
packetsReceived = uint32(neighbor.PacketsRead())
packetsSent = uint32(neighbor.PacketsWritten())
}

return &api.PeerInfo{
ID: peer.ID.String(),
MultiAddresses: multiAddresses,
Alias: alias,
Relation: relation,
Connected: peer.ConnStatus.Load() == network.ConnStatusConnected,
GossipMetrics: &api.PeerGossipMetrics{
PacketsReceived: uint32(neighbor.PacketsRead()),
PacketsSent: uint32(neighbor.PacketsWritten()),
PacketsReceived: packetsReceived,
PacketsSent: packetsSent,
},
}
}
Expand All @@ -68,7 +76,13 @@ func getPeer(c echo.Context) (*api.PeerInfo, error) {
return nil, err
}

neighbor, err := deps.NetworkManager.Neighbor(peerID)
// check connected neighbors first
if neighbor, err := deps.NetworkManager.Neighbor(peerID); err == nil {
return getPeerInfoFromPeer(neighbor.Peer()), nil
}

// if the peer is not connected, check the manual peers
peer, err := deps.NetworkManager.ManualPeer(peerID)
if err != nil {
if ierrors.Is(err, network.ErrUnknownPeer) {
return nil, ierrors.WithMessagef(echo.ErrNotFound, "peer not found, peerID: %s", peerID.String())
Expand All @@ -77,7 +91,7 @@ func getPeer(c echo.Context) (*api.PeerInfo, error) {
return nil, ierrors.WithMessagef(echo.ErrInternalServerError, "failed to get peer: %w", err)
}

return getPeerInfo(neighbor), nil
return getPeerInfoFromPeer(peer), nil
}

// removePeer drops the connection to the peer with the given peerID and removes it from the known peers.
Expand All @@ -90,25 +104,42 @@ func removePeer(c echo.Context) error {
// error is ignored because we don't care about the config here
_ = deps.PeeringConfigManager.RemovePeer(peerID)

return deps.NetworkManager.DropNeighbor(peerID)
return deps.NetworkManager.RemovePeer(peerID)
}

// listPeers returns the list of all peers.
func listPeers() *api.PeersResponse {
allNeighbors := deps.NetworkManager.AllNeighbors()
// get all known manual peers
manualPeers := deps.NetworkManager.ManualPeers()

// get all connected neighbors
allNeighbors := deps.NetworkManager.Neighbors()

result := &api.PeersResponse{
Peers: make([]*api.PeerInfo, len(allNeighbors)),
peersMap := make(map[peer.ID]*network.Peer)
for _, peer := range manualPeers {
peersMap[peer.ID] = peer
}

for i, info := range allNeighbors {
result.Peers[i] = getPeerInfo(info)
for _, neighbor := range allNeighbors {
// it's no problem if the peer is already in the map
peersMap[neighbor.Peer().ID] = neighbor.Peer()
}

return result
peers := make([]*api.PeerInfo, 0, len(peersMap))
for _, peer := range peersMap {
peers = append(peers, getPeerInfoFromPeer(peer))
}

sort.Slice(peers, func(i, j int) bool {
return peers[i].ID < peers[j].ID
})

return &api.PeersResponse{
Peers: peers,
}
}

// addPeer tries to establish a connection to the given peer and adds it to the known peers.
// addPeer adds the peer with the given multiAddress to the manual peering layer.
func addPeer(c echo.Context) (*api.PeerInfo, error) {
request := &api.AddPeerRequest{}

Expand All @@ -118,26 +149,17 @@ func addPeer(c echo.Context) (*api.PeerInfo, error) {

multiAddr, err := multiaddr.NewMultiaddr(request.MultiAddress)
if err != nil {
return nil, ierrors.WithMessagef(httpserver.ErrInvalidParameter, "invalid multiAddress: %w", err)
return nil, ierrors.WithMessagef(httpserver.ErrInvalidParameter, "invalid multiAddress (%s): %w", request.MultiAddress, err)
}

addrInfo, err := peer.AddrInfoFromP2pAddr(multiAddr)
_, err = peer.AddrInfoFromP2pAddr(multiAddr)
if err != nil {
return nil, ierrors.WithMessagef(httpserver.ErrInvalidParameter, "invalid multiAddress: %w", err)
return nil, ierrors.WithMessagef(httpserver.ErrInvalidParameter, "invalid address info from multiAddress (%s): %w", request.MultiAddress, err)
}

if err := deps.NetworkManager.AddManualPeers(multiAddr); err != nil {
return nil, ierrors.WithMessagef(echo.ErrInternalServerError, "failed to add peer: %w", err)
}

peerID := addrInfo.ID
neighbor, err := deps.NetworkManager.Neighbor(peerID)
peer, err := deps.NetworkManager.AddManualPeer(multiAddr)
if err != nil {
if ierrors.Is(err, network.ErrUnknownPeer) {
return nil, ierrors.WithMessagef(echo.ErrNotFound, "peer not found, peerID: %s", peerID.String())
}

return nil, ierrors.WithMessagef(echo.ErrInternalServerError, "failed to get peer: %w", err)
return nil, ierrors.WithMessagef(echo.ErrInternalServerError, "failed to add peer: %w", err)
}

var alias string
Expand All @@ -147,8 +169,8 @@ func addPeer(c echo.Context) (*api.PeerInfo, error) {

// error is ignored because we don't care about the config here
if err := deps.PeeringConfigManager.AddPeer(multiAddr, alias); err != nil {
Component.LogWarnf("failed to add peer to config, peerID: %s, err: %s", peerID.String(), err.Error())
Component.LogWarnf("failed to add peer to config, peerID: %s, err: %s", peer.ID.String(), err.Error())
}

return getPeerInfo(neighbor), nil
return getPeerInfoFromPeer(peer), nil
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ require (
github.com/iotaledger/hive.go/stringify v0.0.0-20240326102522-2e37ab3611a3
github.com/iotaledger/inx-app v1.0.0-rc.3.0.20240307101848-db58eb9353ec
github.com/iotaledger/inx/go v1.0.0-rc.2.0.20240307100839-48553e1d2022
github.com/iotaledger/iota.go/v4 v4.0.0-20240322114706-82a1f8a8b70c
github.com/iotaledger/iota.go/v4 v4.0.0-20240325092426-32979eef3205
github.com/labstack/echo/v4 v4.11.4
github.com/labstack/gommon v0.4.2
github.com/libp2p/go-libp2p v0.33.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -326,8 +326,8 @@ github.com/iotaledger/inx/go v1.0.0-rc.2.0.20240307100839-48553e1d2022 h1:I178Sa
github.com/iotaledger/inx/go v1.0.0-rc.2.0.20240307100839-48553e1d2022/go.mod h1:jTFxIWiMUdAwO263jlJCSWcNLqEkgYEVOFXfjp5aNJM=
github.com/iotaledger/iota-crypto-demo v0.0.0-20240320124000-d02f37a4fdff h1:Do8fakxvFaj7dLckoo/z+mRyBdZo8QvT8HcgnQlG2Sg=
github.com/iotaledger/iota-crypto-demo v0.0.0-20240320124000-d02f37a4fdff/go.mod h1:aVEutEWFnhDNJBxtVuzy2BeTN+8FAlnR83k7hKV0CFE=
github.com/iotaledger/iota.go/v4 v4.0.0-20240322114706-82a1f8a8b70c h1:0uqpCv2txjbVi1E5AFvXkUGmTMiEX1nPzmTFH1Bfk6c=
github.com/iotaledger/iota.go/v4 v4.0.0-20240322114706-82a1f8a8b70c/go.mod h1:qn/63CB0/jE1em6ewqDSiz+ovS+E/os7K5b7g2pmJFg=
github.com/iotaledger/iota.go/v4 v4.0.0-20240325092426-32979eef3205 h1:nn7nCEezVLmFExiONAJ2XAgZKOJJ+iWrwfDBFdYTKSY=
github.com/iotaledger/iota.go/v4 v4.0.0-20240325092426-32979eef3205/go.mod h1:qn/63CB0/jE1em6ewqDSiz+ovS+E/os7K5b7g2pmJFg=
github.com/ipfs/boxo v0.18.0 h1:MOL9/AgoV3e7jlVMInicaSdbgralfqSsbkc31dZ9tmw=
github.com/ipfs/boxo v0.18.0/go.mod h1:pIZgTWdm3k3pLF9Uq6MB8JEcW07UDwNJjlXW1HELW80=
github.com/ipfs/go-cid v0.4.1 h1:A/T3qGvxi4kpKWWcPC/PgbvDA2bjVLO7n4UeVwnbs/s=
Expand Down
2 changes: 1 addition & 1 deletion pkg/network/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ var (
// ErrNotRunning is returned when a peer is added to a stopped or not yet started network manager.
ErrNotRunning = ierrors.New("manager not running")
// ErrUnknownPeer is returned when the specified peer is not known to the network manager.
ErrUnknownPeer = ierrors.New("unknown neighbor")
ErrUnknownPeer = ierrors.New("unknown peer")
// ErrLoopbackPeer is returned when the own peer is added.
ErrLoopbackPeer = ierrors.New("loopback connection not allowed")
// ErrDuplicatePeer is returned when the same peer is added more than once.
Expand Down
32 changes: 25 additions & 7 deletions pkg/network/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,43 @@ import (
"github.com/iotaledger/hive.go/runtime/event"
)

// Manager is the network manager interface.
// Peer is a known node in the network.
// Neighbor is a Peer with an established connection in the gossip layer.
type Manager interface {
Endpoint

// DialPeer connects to a peer.
DialPeer(ctx context.Context, peer *Peer) error

// RemovePeer disconnects the peer with the given ID
// and removes it from manual peering in case it was added manually.
RemovePeer(peerID peer.ID) error
// AddManualPeer adds a manual peer to the list of known peers.
AddManualPeer(multiAddress multiaddr.Multiaddr) (*Peer, error)
// ManualPeer returns the manual peer with the given ID.
ManualPeer(peerID peer.ID) (*Peer, error)
// ManualPeers returns all the manual peers.
ManualPeers(onlyConnected ...bool) []*Peer

// OnNeighborAdded registers a callback that gets triggered when a neighbor is added.
OnNeighborAdded(handler func(Neighbor)) *event.Hook[func(Neighbor)]
// OnNeighborRemoved registers a callback that gets triggered when a neighbor is removed.
OnNeighborRemoved(handler func(Neighbor)) *event.Hook[func(Neighbor)]

// Neighbor returns the neighbor with the given ID.
Neighbor(peerID peer.ID) (Neighbor, error)
AllNeighbors() []Neighbor
AutopeeringNeighbors() []Neighbor

DropNeighbor(peerID peer.ID) error
// NeighborExists checks if a neighbor with the given ID exists.
NeighborExists(peerID peer.ID) bool
// DisconnectNeighbor disconnects the neighbor with the given ID.
DisconnectNeighbor(peerID peer.ID) error

// Neighbors returns all the neighbors that are currently connected.
Neighbors() []Neighbor
// AutopeeringNeighbors returns all the neighbors that are currently connected via autopeering.
AutopeeringNeighbors() []Neighbor

P2PHost() host.Host

Start(ctx context.Context, networkID string) error
Shutdown()

AddManualPeers(multiAddresses ...multiaddr.Multiaddr) error
}
1 change: 1 addition & 0 deletions pkg/network/neighbor.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package network

// Neighbor is a Peer with an established connection in the gossip layer.
type Neighbor interface {
Peer() *Peer
PacketsRead() uint64
Expand Down
2 changes: 1 addition & 1 deletion pkg/network/p2p/autopeering/autopeering.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (m *Manager) discoverAndDialPeers() {
neighborsToDrop := randomSubset(autopeeringNeighbors, -peersToFind)
m.logger.LogDebugf("Too many autopeering neighbors connected %d, disconnecting some", len(neighborsToDrop))
for _, peer := range neighborsToDrop {
if err := m.networkManager.DropNeighbor(peer.Peer().ID); err != nil {
if err := m.networkManager.DisconnectNeighbor(peer.Peer().ID); err != nil {
m.logger.LogDebugf("Failed to disconnect neighbor %s", peer.Peer().ID)
}
}
Expand Down
45 changes: 39 additions & 6 deletions pkg/network/p2p/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func (m *Manager) DialPeer(ctx context.Context, peer *network.Peer) error {
return ierrors.New("no protocol handler registered to dial peer")
}

// Do not try to dial already connected peers.
if m.NeighborExists(peer.ID) {
return ierrors.WithMessagef(network.ErrDuplicatePeer, "peer %s already exists", peer.ID.String())
}
Expand Down Expand Up @@ -193,8 +194,16 @@ func (m *Manager) Shutdown() {
}
}

func (m *Manager) AddManualPeers(peers ...multiaddr.Multiaddr) error {
return m.manualPeering.AddPeers(peers...)
func (m *Manager) AddManualPeer(multiAddr multiaddr.Multiaddr) (*network.Peer, error) {
return m.manualPeering.AddPeer(multiAddr)
}

func (m *Manager) ManualPeer(id peer.ID) (*network.Peer, error) {
return m.manualPeering.Peer(id)
}

func (m *Manager) ManualPeers(onlyConnected ...bool) []*network.Peer {
return m.manualPeering.GetPeers(onlyConnected...)
}

// LocalPeerID returns the local peer ID.
Expand All @@ -207,8 +216,27 @@ func (m *Manager) P2PHost() host.Host {
return m.libp2pHost
}

// DropNeighbor disconnects the neighbor with the given ID and the group.
func (m *Manager) DropNeighbor(id peer.ID) error {
// RemovePeer disconnects the neighbor with the given ID
// and removes it from manual peering in case it was added manually.
func (m *Manager) RemovePeer(id peer.ID) error {
if m.manualPeering.IsPeerKnown(id) {
// RemovePeer calls DisconnectNeighbor internally
if err := m.manualPeering.RemovePeer(id); err != nil {
return err
}

return nil
}

if err := m.DisconnectNeighbor(id); err != nil && !ierrors.Is(err, network.ErrUnknownPeer) {
return ierrors.Wrapf(err, "failed to drop peer %s in the gossip layer", id.String())
}

return nil
}

// DisconnectNeighbor disconnects the neighbor with the given ID.
func (m *Manager) DisconnectNeighbor(id peer.ID) error {
nbr, err := m.neighbor(id)
if err != nil {
return ierrors.WithStack(err)
Expand All @@ -232,7 +260,8 @@ func (m *Manager) Send(packet proto.Message, to ...peer.ID) {
}
}

func (m *Manager) AllNeighbors() []network.Neighbor {
// Neighbors returns all the neighbors that are currently connected.
func (m *Manager) Neighbors() []network.Neighbor {
neighbors := m.allNeighbors()
result := make([]network.Neighbor, len(neighbors))
for i, n := range neighbors {
Expand All @@ -247,8 +276,9 @@ func (m *Manager) allNeighbors() []*neighbor {
return m.neighbors.Values()
}

// AutopeeringNeighbors returns all the neighbors that are currently connected via autopeering.
func (m *Manager) AutopeeringNeighbors() []network.Neighbor {
return lo.Filter(m.AllNeighbors(), func(n network.Neighbor) bool {
return lo.Filter(m.Neighbors(), func(n network.Neighbor) bool {
return !m.manualPeering.IsPeerKnown(n.Peer().ID)
})
}
Expand Down Expand Up @@ -350,11 +380,14 @@ func (m *Manager) addNeighbor(ctx context.Context, peer *network.Peer, ps *Packe
if peer.ID == m.libp2pHost.ID() {
return ierrors.WithStack(network.ErrLoopbackPeer)
}

m.shutdownMutex.RLock()
defer m.shutdownMutex.RUnlock()

if m.isShutdown {
return network.ErrNotRunning
}

if m.NeighborExists(peer.ID) {
return ierrors.WithStack(network.ErrDuplicatePeer)
}
Expand Down
Loading

0 comments on commit 46dab61

Please sign in to comment.