Skip to content

Commit

Permalink
fix: resolve data race in Broadcasters system (#153)
Browse files Browse the repository at this point in the history
This resolves a data race in the Broadcaster implementation,
specifically ` HasListeners()` was not protected by a lock.

Additionally it swaps out the mutex for a `RWLock` so that concurrent
readers don't block each other.

A behavioral change is that the `Broadcast` method now locks the
subscribers array, so that `Add` or `RemoveListener` cannot be called
concurrently.

---------

Co-authored-by: John Starich <[email protected]>
  • Loading branch information
cwaldren-ld and JohnStarich authored Jun 4, 2024
1 parent 68e3440 commit 68cb1a4
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 20 deletions.
35 changes: 15 additions & 20 deletions internal/broadcasters.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package internal

import (
"slices"
"sync"

"golang.org/x/exp/slices"
)

// This file defines the publish-subscribe model we use for various status/event types in the SDK.
Expand All @@ -19,11 +18,11 @@ const subscriberChannelBufferLength = 10
// Broadcaster is our generalized implementation of broadcasters.
type Broadcaster[V any] struct {
subscribers []channelPair[V]
lock sync.Mutex
lock sync.RWMutex
}

// We need to keep track of both the channel we use for sending (stored as a reflect.Value, because Value
// has methods for sending and closing), and also the
// has methods for sending and closing), and also the channel for receiving.
type channelPair[V any] struct {
sendCh chan<- V
receiveCh <-chan V
Expand All @@ -50,35 +49,31 @@ func (b *Broadcaster[V]) AddListener() <-chan V {
func (b *Broadcaster[V]) RemoveListener(ch <-chan V) {
b.lock.Lock()
defer b.lock.Unlock()
ss := b.subscribers
for i, s := range ss {
b.subscribers = slices.DeleteFunc(b.subscribers, func(pair channelPair[V]) bool {
// The following equality test is the reason why we have to store both the sendCh (chan X) and
// the receiveCh (<-chan X) for each subscriber; "s.sendCh == ch" would not be true because
// they're of two different types.
if s.receiveCh == ch {
copy(ss[i:], ss[i+1:])
ss[len(ss)-1] = channelPair[V]{}
b.subscribers = ss[:len(ss)-1]
close(s.sendCh)
break
if pair.receiveCh == ch {
close(pair.sendCh)
return true
}
}
return false
})
}

// HasListeners returns true if there are any current subscribers.
func (b *Broadcaster[V]) HasListeners() bool {
b.lock.RLock()
defer b.lock.RUnlock()
return len(b.subscribers) > 0
}

// Broadcast broadcasts a value to all current subscribers.
func (b *Broadcaster[V]) Broadcast(value V) {
b.lock.Lock()
ss := slices.Clone(b.subscribers)
b.lock.Unlock()
if len(ss) > 0 {
for _, ch := range ss {
ch.sendCh <- value
}
b.lock.RLock()
defer b.lock.RUnlock()
for _, ch := range b.subscribers {
ch.sendCh <- value
}
}

Expand Down
30 changes: 30 additions & 0 deletions internal/broadcasters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package internal

import (
"fmt"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -81,3 +82,32 @@ func testBroadcasterGenerically[V any](t *testing.T, broadcasterFactory func() *
})
})
}

func TestBroadcasterDataRace(t *testing.T) {
t.Parallel()
b := NewBroadcaster[string]()
t.Cleanup(b.Close)

var waitGroup sync.WaitGroup
for _, fn := range []func(){
// Run every method that uses b.subscribers concurrently to detect data races
func() { b.AddListener() },
func() { b.Broadcast("foo") },
func() { b.Close() },
func() { b.HasListeners() },
func() { b.RemoveListener(nil) },
} {
const concurrentRoutinesWithSelf = 2
// Run a method concurrently with itself to detect data races. These methods will also be
// run concurrently with the previous/next methods in the list.
for i := 0; i < concurrentRoutinesWithSelf; i++ {
waitGroup.Add(1)
fn := fn // make fn a loop-local variable
go func() {
defer waitGroup.Done()
fn()
}()
}
}
waitGroup.Wait()
}

0 comments on commit 68cb1a4

Please sign in to comment.