Skip to content

Commit

Permalink
accelerated-dht: cleanup peers from message sender on disconnection
Browse files Browse the repository at this point in the history
  • Loading branch information
sukunrt committed Dec 27, 2024
1 parent 88ef336 commit e6be480
Showing 1 changed file with 37 additions and 5 deletions.
42 changes: 37 additions & 5 deletions fullrt/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ import (
"github.com/multiformats/go-multihash"

"github.com/libp2p/go-libp2p-routing-helpers/tracing"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/core/routing"
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
swarm "github.com/libp2p/go-libp2p/p2p/net/swarm"

"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -98,6 +100,8 @@ type FullRT struct {
bulkSendParallelism int

self peer.ID

peerConnectednessSubscriber event.Subscription
}

// NewFullRT creates a DHT client that tracks the full network. It takes a protocol prefix for the given network,
Expand Down Expand Up @@ -151,6 +155,11 @@ func NewFullRT(h host.Host, protocolPrefix protocol.ID, options ...Option) (*Ful
}
}

sub, err := h.EventBus().Subscribe(new(event.EvtPeerConnectednessChanged), eventbus.Name("fullrt-dht"))
if err != nil {
return nil, fmt.Errorf("peer connectedness subscription failed: %w", err)
}

Check warning on line 161 in fullrt/dht.go

View check run for this annotation

Codecov / codecov/patch

fullrt/dht.go#L158-L161

Added lines #L158 - L161 were not covered by tests

ctx, cancel := context.WithCancel(context.Background())

self := h.ID()
Expand Down Expand Up @@ -195,14 +204,14 @@ func NewFullRT(h host.Host, protocolPrefix protocol.ID, options ...Option) (*Ful

crawlerInterval: fullrtcfg.crawlInterval,

bulkSendParallelism: fullrtcfg.bulkSendParallelism,

self: self,
bulkSendParallelism: fullrtcfg.bulkSendParallelism,
self: self,
peerConnectednessSubscriber: sub,

Check warning on line 209 in fullrt/dht.go

View check run for this annotation

Codecov / codecov/patch

fullrt/dht.go#L207-L209

Added lines #L207 - L209 were not covered by tests
}

rt.wg.Add(1)
rt.wg.Add(2)

Check warning on line 212 in fullrt/dht.go

View check run for this annotation

Codecov / codecov/patch

fullrt/dht.go#L212

Added line #L212 was not covered by tests
go rt.runCrawler(ctx)

go rt.runSubscriber()

Check warning on line 214 in fullrt/dht.go

View check run for this annotation

Codecov / codecov/patch

fullrt/dht.go#L214

Added line #L214 was not covered by tests
return rt, nil
}

Expand All @@ -211,6 +220,29 @@ type crawlVal struct {
key kadkey.Key
}

func (dht *FullRT) runSubscriber() {
defer dht.wg.Done()
ms, ok := dht.messageSender.(dht_pb.MessageSenderWithDisconnect)
if !ok {
return
}
defer dht.peerConnectednessSubscriber.Close()
for {
select {
case e := <-dht.peerConnectednessSubscriber.Out():
pc, ok := e.(event.EvtPeerConnectednessChanged)
if !ok {
logger.Errorf("invalid event message type: %T", e)
}
if pc.Connectedness != network.Connected {
ms.OnDisconnect(dht.ctx, pc.Peer)
}
case <-dht.ctx.Done():
return

Check warning on line 241 in fullrt/dht.go

View check run for this annotation

Codecov / codecov/patch

fullrt/dht.go#L223-L241

Added lines #L223 - L241 were not covered by tests
}
}
}

func (dht *FullRT) TriggerRefresh(ctx context.Context) error {
select {
case <-ctx.Done():
Expand Down

0 comments on commit e6be480

Please sign in to comment.