Skip to content

Commit

Permalink
fix: handle p2p events properly in cluster tests (#3452)
Browse files Browse the repository at this point in the history
## Description

This PR fixes an issue with the p2p event stream where events can be
missed due to not being actively scooped from the channel, causing tests
to hang. This non-blocking nature is by design, the p2p event stream
should be non-blocking for subscribers at all times when new events are
added.

The PR increases the buffer size for events (from 1 to a reasonable
value). For larger event loads, subscribers can handle channel draining
caller side without any issues
  • Loading branch information
zivkovicmilos authored Jan 7, 2025
1 parent f245127 commit faf70cb
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 5 deletions.
9 changes: 6 additions & 3 deletions tm2/pkg/internal/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,13 @@ func MakeConnectedPeers(
multiplexSwitch.DialPeers(addrs...)

// Set up an exit timer
timer := time.NewTimer(5 * time.Second)
timer := time.NewTimer(1 * time.Minute)
defer timer.Stop()

connectedPeers := make(map[p2pTypes.ID]struct{})
var (
connectedPeers = make(map[p2pTypes.ID]struct{})
targetPeers = cfg.Count - 1
)

for {
select {
Expand All @@ -143,7 +146,7 @@ func MakeConnectedPeers(

connectedPeers[ev.PeerID] = struct{}{}

if len(connectedPeers) == cfg.Count-1 {
if len(connectedPeers) == targetPeers {
return nil
}
case <-timer.C:
Expand Down
12 changes: 10 additions & 2 deletions tm2/pkg/p2p/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,12 @@ type (
func (s *subscriptions) add(filterFn EventFilter) (string, chan Event) {
var (
id = xid.New().String()
ch = make(chan Event, 1)
// Since the event stream is non-blocking,
// the event buffer should be sufficiently
// large for most use-cases. Subscribers can
// handle large event load caller-side to mitigate
// events potentially being missed
ch = make(chan Event, 100)
)

(*s)[id] = subscription{
Expand Down Expand Up @@ -99,6 +104,9 @@ func (s *subscriptions) notify(event Event) {
continue
}

sub.ch <- event
select {
case sub.ch <- event:
default: // non-blocking
}
}
}

0 comments on commit faf70cb

Please sign in to comment.