Skip to content

Commit

Permalink
Fix broadcast data races (#151)
Browse files Browse the repository at this point in the history
**Requirements**

- [x] I have added test coverage for new or changed functionality
- [x] I have followed the repository's [pull request submission
guidelines](../blob/v5/CONTRIBUTING.md#submitting-pull-requests)
- [ ] I have validated my changes against all supported platform
versions
* What does "platform" mean here? Go version? OS version? Happy to help
test, just not sure what I'm testing!

**Related issues**

I hit data races when running tests in parallel, which effectively
obscures other data races my application may have. It looks like others
did too, judging by #102.
Fixes #102

**Describe the solution you've provided**

I've addressed 2 data races with improved locking:
* `HasListeners()` had a data race due to any mutation on
`b.subscribers`
* `Close()` had a data race where closing a send channel triggers a
panic in `Broadcast()`

I also saw an easy opportunity to use more fine-grained locks with an
RWMutex, although I'm happy to back that out if you would prefer.

**Describe alternatives you've considered**

I also considered using an atomic data type for subscribers, but I
figured that change would be less of a surgical fix. It also may be more
difficult to mutate the slice, since a compare-and-swap can fail and
would need a loop (it's not as simple as `atomic.Add`).

Another idea which may be simpler is using a channel to manage shutdown.
Typically I'd use a `context.Context` here to manage cancellation.
That'd also prevent `Broadcast()`ing on a full send channel from
blocking `Close()`.

**Additional context**

Add any other context about the pull request here.
  • Loading branch information
JohnStarich authored May 28, 2024
1 parent 68e3440 commit 9b387be
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 11 deletions.
20 changes: 9 additions & 11 deletions internal/broadcasters.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package internal

import (
"sync"

"golang.org/x/exp/slices"
)

// This file defines the publish-subscribe model we use for various status/event types in the SDK.
Expand All @@ -19,7 +17,7 @@ const subscriberChannelBufferLength = 10
// Broadcaster is our generalized implementation of broadcasters.
type Broadcaster[V any] struct {
subscribers []channelPair[V]
lock sync.Mutex
lock sync.RWMutex
}

// We need to keep track of both the channel we use for sending (stored as a reflect.Value, because Value
Expand Down Expand Up @@ -67,18 +65,18 @@ func (b *Broadcaster[V]) RemoveListener(ch <-chan V) {

// HasListeners returns true if there are any current subscribers.
func (b *Broadcaster[V]) HasListeners() bool {
return len(b.subscribers) > 0
b.lock.RLock()
hasListeners := len(b.subscribers) > 0
b.lock.RUnlock()
return hasListeners
}

// Broadcast broadcasts a value to all current subscribers.
func (b *Broadcaster[V]) Broadcast(value V) {
b.lock.Lock()
ss := slices.Clone(b.subscribers)
b.lock.Unlock()
if len(ss) > 0 {
for _, ch := range ss {
ch.sendCh <- value
}
b.lock.RLock()
defer b.lock.RUnlock()
for _, ch := range b.subscribers {
ch.sendCh <- value
}
}

Expand Down
29 changes: 29 additions & 0 deletions internal/broadcasters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package internal

import (
"fmt"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -81,3 +82,31 @@ func testBroadcasterGenerically[V any](t *testing.T, broadcasterFactory func() *
})
})
}

func TestBroadcasterDataRace(t *testing.T) {
t.Parallel()
b := NewBroadcaster[string]()
t.Cleanup(b.Close)

var waitGroup sync.WaitGroup
for _, fn := range []func(){
// run every method that uses b.subscribers concurrently to detect data races
func() { b.AddListener() },
func() { b.Broadcast("foo") },
func() { b.Close() },
func() { b.HasListeners() },
func() { b.RemoveListener(nil) },
} {
const concurrentRoutinesWithSelf = 2
// run a method concurrently with itself to detect data races
for i := 0; i < concurrentRoutinesWithSelf; i++ {
waitGroup.Add(1)
fn := fn // make fn a loop-local variable
go func() {
defer waitGroup.Done()
fn()
}()
}
}
waitGroup.Wait()
}

0 comments on commit 9b387be

Please sign in to comment.