From 771070ccd12d7325e725156b5a027f352f190270 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Mart=C3=AD?= Date: Tue, 27 Jul 2021 20:45:59 +0100 Subject: [PATCH] multirpc/subpub: set limits on libp2p discovery The library seems to generally have no default limit, and we've encountered gateways in some environments spiking to as many as 200K goroutines, so this seems like a likely culprit. And even if this is not the cause, setting a limit is a good idea anyway. As a start, we're using four times MaxDHTpeers as the discovery limit for both FindPeers and Advertise. It seems high enough to keep things working, but low enough that we can't run out of memory with hundreds of thousands of goroutines. Updates #243. --- multirpc/subpub/discovery.go | 11 ++++++++--- multirpc/subpub/peers.go | 8 +++++--- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/multirpc/subpub/discovery.go b/multirpc/subpub/discovery.go index ee8fbb52c..bd354554e 100644 --- a/multirpc/subpub/discovery.go +++ b/multirpc/subpub/discovery.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + corediscovery "github.com/libp2p/go-libp2p-core/discovery" "github.com/libp2p/go-libp2p-core/protocol" "go.vocdoni.io/dvote/log" ) @@ -13,7 +14,8 @@ func (ps *SubPub) discover(ctx context.Context) { // Now, look for others who have announced. // This is like your friend telling you the location to meet you. log.Debugf("searching for SubPub group identity %s", ps.Topic) - peerChan, err := ps.routing.FindPeers(ctx, ps.Topic) + peerChan, err := ps.routing.FindPeers(ctx, ps.Topic, + corediscovery.Limit(4*ps.MaxDHTpeers)) if err != nil { log.Fatal(err) } @@ -79,7 +81,9 @@ func (ps *SubPub) Subscribe(ctx context.Context) { case <-ps.close: return default: - ps.discover(ctx) + pctx, cancel := context.WithTimeout(ctx, ps.DiscoveryPeriod) + ps.discover(pctx) + cancel() time.Sleep(ps.DiscoveryPeriod) } } @@ -97,7 +101,8 @@ func (ps *SubPub) advertise(ctx context.Context, topic string) { // The duration should be updated, and be in the order // of multiple hours. var err error - duration, err = ps.routing.Advertise(ctx, topic) + duration, err = ps.routing.Advertise(ctx, topic, + corediscovery.Limit(4*ps.MaxDHTpeers)) if err == nil && duration < time.Second { err = fmt.Errorf("refusing to advertise too often: %v", duration) } diff --git a/multirpc/subpub/peers.go b/multirpc/subpub/peers.go index 7b0bdafa0..2c612935d 100644 --- a/multirpc/subpub/peers.go +++ b/multirpc/subpub/peers.go @@ -6,6 +6,7 @@ import ( "fmt" "time" + corediscovery "github.com/libp2p/go-libp2p-core/discovery" libpeer "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" multiaddr "github.com/multiformats/go-multiaddr" @@ -37,9 +38,10 @@ func (ps *SubPub) PeerStreamWrite(peerID string, msg []byte) error { // FindTopic opens one or multiple new streams with the peers announcing the namespace. // The callback function is executed once a new stream connection is created -func (ps *SubPub) FindTopic(namespace string, callback func(*bufio.ReadWriter)) error { +func (ps *SubPub) FindTopic(ctx context.Context, namespace string, callback func(*bufio.ReadWriter)) error { log.Infof("searching for topic %s", namespace) - peerChan, err := ps.routing.FindPeers(context.Background(), namespace) + peerChan, err := ps.routing.FindPeers(ctx, namespace, + corediscovery.Limit(4*ps.MaxDHTpeers)) if err != nil { return err } @@ -109,7 +111,7 @@ func (ps *SubPub) peersManager() { } } ps.PeersMu.Unlock() - tctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + tctx, cancel := context.WithTimeout(context.Background(), ps.CollectionPeriod) ps.Host.ConnManager().TrimOpenConns(tctx) // Not sure if it works cancel() time.Sleep(ps.CollectionPeriod)