Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only gossip connected peers #38

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 52 additions & 14 deletions p2p/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ func (pd *PeerDiscovery) addPeer(pinfo peer.AddrInfo) {
pd.knownPeers[pinfo.ID] = oldPinfo
}

// GetPeers returns all known peers
func (pd *PeerDiscovery) GetPeers() []peer.AddrInfo {
// GetKnownPeers returns all known peers
func (pd *PeerDiscovery) GetKnownPeers() []peer.AddrInfo {
pd.mu.RLock()
defer pd.mu.RUnlock()

Expand All @@ -106,6 +106,39 @@ func (pd *PeerDiscovery) GetPeers() []peer.AddrInfo {
return peers
}

// GetConnectedPeers returns all currently connected peers
func (pd *PeerDiscovery) GetConnectedPeers() []peer.AddrInfo {
conns := pd.host.Network().Conns()
peerMap := make(map[peer.ID]peer.AddrInfo)

for _, conn := range conns {
remotePeer := conn.RemotePeer()
remoteAddr := conn.RemoteMultiaddr()

if peerInfo, exists := peerMap[remotePeer]; exists {
// peer already in map, add the new address if it's not already there
if !multiaddr.Contains(peerInfo.Addrs, remoteAddr) {
peerInfo.Addrs = append(peerInfo.Addrs, remoteAddr)
peerMap[remotePeer] = peerInfo
}
} else {
// new peer, add to map
peerMap[remotePeer] = peer.AddrInfo{
ID: remotePeer,
Addrs: []multiaddr.Multiaddr{remoteAddr},
}
}
}

// flatten map
peers := make([]peer.AddrInfo, 0, len(peerMap))
for _, peerInfo := range peerMap {
peers = append(peers, peerInfo)
}

return peers
}
Comment on lines +109 to +140
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider adding concurrency protection and error handling.

While the implementation is functionally correct, consider these improvements:

  1. Add mutex protection as the method accesses shared network state
  2. Add error handling for edge cases (e.g., connection state changes during iteration)
 func (pd *PeerDiscovery) GetConnectedPeers() []peer.AddrInfo {
+    pd.mu.RLock()
+    defer pd.mu.RUnlock()
+
     conns := pd.host.Network().Conns()
     peerMap := make(map[peer.ID]peer.AddrInfo)
 
     for _, conn := range conns {
+        // Skip if connection is closing
+        if conn.Stat().Direction == network.DirUnknown {
+            continue
+        }
         remotePeer := conn.RemotePeer()
         remoteAddr := conn.RemoteMultiaddr()

Committable suggestion skipped: line range outside the PR's diff.


// handleDiscovery handles incoming discovery streams
func (pd *PeerDiscovery) handleDiscovery(s network.Stream) {
pd.logger.Debug().
Expand All @@ -121,8 +154,9 @@ func (pd *PeerDiscovery) handleDiscovery(s network.Stream) {
}
pd.addPeer(ai)

// Share our known peers
peers := pd.GetPeers()
// Share our connected peers
// we shouldn't share peers that are not actually connectable
peers := pd.GetConnectedPeers()
data, err := json.Marshal(peers)
if err != nil {
pd.logger.Error().Err(err).Msgf("Failed to marshal peers")
Expand Down Expand Up @@ -157,7 +191,7 @@ func (pd *PeerDiscovery) startGossip(ctx context.Context) {

func (pd *PeerDiscovery) gossipPeers(ctx context.Context) {
pd.logger.Debug().Msgf("Gossiping known peers")
peers := pd.GetPeers()
peers := pd.GetKnownPeers()
pd.logger.Debug().
Array("peers", zerolog.Arr().Interface(peers)).
Msgf("current peers")
Expand All @@ -178,16 +212,20 @@ func (pd *PeerDiscovery) gossipPeers(ctx context.Context) {
defer wg.Done()
defer func() { <-sem }()

err := pd.host.Connect(ctx, p)
if err != nil {
pd.logger.Error().Err(err).
Stringer("to", p.ID).
Msg("Failed to connect to peer")
return
// only connect to peer if no active connections
// to prevent interruptions if address is changing
if len(pd.host.Network().ConnsToPeer(p.ID)) == 0 {
err := pd.host.Connect(ctx, p)
if err != nil {
pd.logger.Error().Err(err).
Stringer("to", p.ID).
Msg("Failed to connect to peer")
return
}
pd.logger.Debug().
Stringer("to", p).
Msg("Connected to peer")
}
pd.logger.Debug().
Stringer("to", p).
Msg("Connected to peer")

// Open discovery stream
s, err := pd.host.NewStream(ctx, p.ID, DiscoveryProtocol)
Expand Down