Skip to content

Commit

Permalink
add test for concurrent AddListener and Broadcast
Browse files Browse the repository at this point in the history
  • Loading branch information
cwaldren-ld committed Jun 3, 2024
1 parent e8d60fe commit eaa33b9
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 2 deletions.
5 changes: 3 additions & 2 deletions internal/broadcasters.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ func (b *Broadcaster[V]) HasListeners() bool {
// Broadcast broadcasts a value to all current subscribers.
func (b *Broadcaster[V]) Broadcast(value V) {
b.lock.RLock()
defer b.lock.RUnlock()
for _, ch := range b.subscribers {
subs := slices.Clone(b.subscribers)
b.lock.RUnlock()
for _, ch := range subs {
ch.sendCh <- value
}
}
Expand Down
40 changes: 40 additions & 0 deletions internal/broadcasters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,43 @@ func TestBroadcasterDataRace(t *testing.T) {
}
waitGroup.Wait()
}

func TestBroadcastWhileAddingListener(t *testing.T) {
// The purpose of this test is to ensure that the Broadcast method doesn't block adding a listener concurrently.
// This would be the case if the Broadcaster held a write lock while broadcasting, since the channel sends
// could take an arbitrary amount of time. Instead, it clones the list of subscribers locally.
t.Parallel()
b := NewBroadcaster[string]()
t.Cleanup(b.Close)

// This returns a buffered channel. Fill the buffer entirely.
listener1 := b.AddListener()
for i := 0; i < subscriberChannelBufferLength; i++ {
b.Broadcast("foo")
}

isUnblocked := make(chan struct{})
go func() {
// This should block until we either pop something from the channel, or close it.
b.Broadcast("blocked!")
close(isUnblocked)
}()

select {
case <-isUnblocked:
t.Fatal("Didn't expect b.Broadcast to unblock yet")
default:
}

// Now, we should be able to add a listener while Broadcast is still blocked.
b.AddListener()

select {
case <-isUnblocked:
t.Fatal("Didn't expect b.Broadcast to unblock yet")
default:
}

<-listener1 // Allow Broadcast to push the final value to the listener.
th.AssertChannelClosed(t, isUnblocked, 100*time.Millisecond)
}

0 comments on commit eaa33b9

Please sign in to comment.