Skip to content

Commit

Permalink
Additional test for Broadcaster data race (#152)
Browse files Browse the repository at this point in the history
This is a small cleanup to the existing Broadcasters implementation to
use `slices.DeleteFunc`.
  • Loading branch information
cwaldren-ld authored May 29, 2024
1 parent 9b387be commit c342ba2
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 15 deletions.
23 changes: 10 additions & 13 deletions internal/broadcasters.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package internal

import (
"slices"
"sync"
)

Expand All @@ -21,7 +22,7 @@ type Broadcaster[V any] struct {
}

// We need to keep track of both the channel we use for sending (stored as a reflect.Value, because Value
// has methods for sending and closing), and also the
// has methods for sending and closing), and also the channel for receiving.
type channelPair[V any] struct {
sendCh chan<- V
receiveCh <-chan V
Expand All @@ -48,27 +49,23 @@ func (b *Broadcaster[V]) AddListener() <-chan V {
func (b *Broadcaster[V]) RemoveListener(ch <-chan V) {
b.lock.Lock()
defer b.lock.Unlock()
ss := b.subscribers
for i, s := range ss {
b.subscribers = slices.DeleteFunc(b.subscribers, func(pair channelPair[V]) bool {
// The following equality test is the reason why we have to store both the sendCh (chan X) and
// the receiveCh (<-chan X) for each subscriber; "s.sendCh == ch" would not be true because
// they're of two different types.
if s.receiveCh == ch {
copy(ss[i:], ss[i+1:])
ss[len(ss)-1] = channelPair[V]{}
b.subscribers = ss[:len(ss)-1]
close(s.sendCh)
break
if pair.receiveCh == ch {
close(pair.sendCh)
return true
}
}
return false
})
}

// HasListeners returns true if there are any current subscribers.
func (b *Broadcaster[V]) HasListeners() bool {
b.lock.RLock()
hasListeners := len(b.subscribers) > 0
b.lock.RUnlock()
return hasListeners
defer b.lock.RUnlock()
return len(b.subscribers) > 0
}

// Broadcast broadcasts a value to all current subscribers.
Expand Down
36 changes: 34 additions & 2 deletions internal/broadcasters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package internal

import (
"fmt"
"math/rand"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -90,15 +91,16 @@ func TestBroadcasterDataRace(t *testing.T) {

var waitGroup sync.WaitGroup
for _, fn := range []func(){
// run every method that uses b.subscribers concurrently to detect data races
// Run every method that uses b.subscribers concurrently to detect data races
func() { b.AddListener() },
func() { b.Broadcast("foo") },
func() { b.Close() },
func() { b.HasListeners() },
func() { b.RemoveListener(nil) },
} {
const concurrentRoutinesWithSelf = 2
// run a method concurrently with itself to detect data races
// Run a method concurrently with itself to detect data races. These methods will also be
// run concurrently with the previous/next methods in the list.
for i := 0; i < concurrentRoutinesWithSelf; i++ {
waitGroup.Add(1)
fn := fn // make fn a loop-local variable
Expand All @@ -110,3 +112,33 @@ func TestBroadcasterDataRace(t *testing.T) {
}
waitGroup.Wait()
}

func TestBroadcasterDataRaceRandomFunctionOrder(t *testing.T) {
t.Parallel()
b := NewBroadcaster[string]()
t.Cleanup(b.Close)

funcs := []func(){
func() { b.AddListener() },
func() { b.Broadcast("foo") },
func() { b.Close() },
func() { b.HasListeners() },
func() { b.RemoveListener(nil) },
}
var waitGroup sync.WaitGroup

const N = 1000

// We're going to keep adding random functions to the set of currently executing functions
// for N iterations. This way, we can detect races that might be order-dependent.

for i := 0; i < N; i++ {
waitGroup.Add(1)
fn := funcs[rand.Intn(len(funcs))]
go func() {
defer waitGroup.Done()
fn()
}()
}
waitGroup.Wait()
}

0 comments on commit c342ba2

Please sign in to comment.