Skip to content

Commit

Permalink
multirpc/subpub: set limits on libp2p discovery
Browse files Browse the repository at this point in the history
The library seems to generally have no default limit, and we've
encountered gateways in some environments spiking to as many as 200K
goroutines, so this seems like a likely culprit.

And even if this is not the cause, setting a limit is a good idea
anyway.

As a start, we're using four times MaxDHTpeers as the discovery limit
for both FindPeers and Advertise. It seems high enough to keep things
working, but low enough that we can't run out of memory with hundreds of
thousands of goroutines.

Updates #243.
  • Loading branch information
mvdan authored and p4u committed Jul 28, 2021
1 parent 53a7bf5 commit 771070c
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 6 deletions.
11 changes: 8 additions & 3 deletions multirpc/subpub/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"time"

corediscovery "github.com/libp2p/go-libp2p-core/discovery"
"github.com/libp2p/go-libp2p-core/protocol"
"go.vocdoni.io/dvote/log"
)
Expand All @@ -13,7 +14,8 @@ func (ps *SubPub) discover(ctx context.Context) {
// Now, look for others who have announced.
// This is like your friend telling you the location to meet you.
log.Debugf("searching for SubPub group identity %s", ps.Topic)
peerChan, err := ps.routing.FindPeers(ctx, ps.Topic)
peerChan, err := ps.routing.FindPeers(ctx, ps.Topic,
corediscovery.Limit(4*ps.MaxDHTpeers))
if err != nil {
log.Fatal(err)
}
Expand Down Expand Up @@ -79,7 +81,9 @@ func (ps *SubPub) Subscribe(ctx context.Context) {
case <-ps.close:
return
default:
ps.discover(ctx)
pctx, cancel := context.WithTimeout(ctx, ps.DiscoveryPeriod)
ps.discover(pctx)
cancel()
time.Sleep(ps.DiscoveryPeriod)
}
}
Expand All @@ -97,7 +101,8 @@ func (ps *SubPub) advertise(ctx context.Context, topic string) {
// The duration should be updated, and be in the order
// of multiple hours.
var err error
duration, err = ps.routing.Advertise(ctx, topic)
duration, err = ps.routing.Advertise(ctx, topic,
corediscovery.Limit(4*ps.MaxDHTpeers))
if err == nil && duration < time.Second {
err = fmt.Errorf("refusing to advertise too often: %v", duration)
}
Expand Down
8 changes: 5 additions & 3 deletions multirpc/subpub/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"time"

corediscovery "github.com/libp2p/go-libp2p-core/discovery"
libpeer "github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
multiaddr "github.com/multiformats/go-multiaddr"
Expand Down Expand Up @@ -37,9 +38,10 @@ func (ps *SubPub) PeerStreamWrite(peerID string, msg []byte) error {

// FindTopic opens one or multiple new streams with the peers announcing the namespace.
// The callback function is executed once a new stream connection is created
func (ps *SubPub) FindTopic(namespace string, callback func(*bufio.ReadWriter)) error {
func (ps *SubPub) FindTopic(ctx context.Context, namespace string, callback func(*bufio.ReadWriter)) error {
log.Infof("searching for topic %s", namespace)
peerChan, err := ps.routing.FindPeers(context.Background(), namespace)
peerChan, err := ps.routing.FindPeers(ctx, namespace,
corediscovery.Limit(4*ps.MaxDHTpeers))
if err != nil {
return err
}
Expand Down Expand Up @@ -109,7 +111,7 @@ func (ps *SubPub) peersManager() {
}
}
ps.PeersMu.Unlock()
tctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
tctx, cancel := context.WithTimeout(context.Background(), ps.CollectionPeriod)
ps.Host.ConnManager().TrimOpenConns(tctx) // Not sure if it works
cancel()
time.Sleep(ps.CollectionPeriod)
Expand Down

0 comments on commit 771070c

Please sign in to comment.