From 9b387bea6301eb91ee5fc5af64ce6b105dc4fc41 Mon Sep 17 00:00:00 2001 From: John Starich Date: Tue, 28 May 2024 16:09:05 -0500 Subject: [PATCH 1/6] Fix broadcast data races (#151) **Requirements** - [x] I have added test coverage for new or changed functionality - [x] I have followed the repository's [pull request submission guidelines](../blob/v5/CONTRIBUTING.md#submitting-pull-requests) - [ ] I have validated my changes against all supported platform versions * What does "platform" mean here? Go version? OS version? Happy to help test, just not sure what I'm testing! **Related issues** I hit data races when running tests in parallel, which effectively obscures other data races my application may have. It looks like others did too, judging by #102. Fixes https://github.com/launchdarkly/go-server-sdk/issues/102 **Describe the solution you've provided** I've addressed 2 data races with improved locking: * `HasListeners()` had a data race due to any mutation on `b.subscribers` * `Close()` had a data race where closing a send channel triggers a panic in `Broadcast()` I also saw an easy opportunity to use more fine-grained locks with an RWMutex, although I'm happy to back that out if you would prefer. **Describe alternatives you've considered** I also considered using an atomic data type for subscribers, but I figured that change would be less of a surgical fix. It also may be more difficult to mutate the slice, since a compare-and-swap can fail and would need a loop (it's not as simple as `atomic.Add`). Another idea which may be simpler is using a channel to manage shutdown. Typically I'd use a `context.Context` here to manage cancellation. That'd also prevent `Broadcast()`ing on a full send channel from blocking `Close()`. **Additional context** Add any other context about the pull request here. --- internal/broadcasters.go | 20 +++++++++----------- internal/broadcasters_test.go | 29 +++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+), 11 deletions(-) diff --git a/internal/broadcasters.go b/internal/broadcasters.go index 0632ec63..502be709 100644 --- a/internal/broadcasters.go +++ b/internal/broadcasters.go @@ -2,8 +2,6 @@ package internal import ( "sync" - - "golang.org/x/exp/slices" ) // This file defines the publish-subscribe model we use for various status/event types in the SDK. @@ -19,7 +17,7 @@ const subscriberChannelBufferLength = 10 // Broadcaster is our generalized implementation of broadcasters. type Broadcaster[V any] struct { subscribers []channelPair[V] - lock sync.Mutex + lock sync.RWMutex } // We need to keep track of both the channel we use for sending (stored as a reflect.Value, because Value @@ -67,18 +65,18 @@ func (b *Broadcaster[V]) RemoveListener(ch <-chan V) { // HasListeners returns true if there are any current subscribers. func (b *Broadcaster[V]) HasListeners() bool { - return len(b.subscribers) > 0 + b.lock.RLock() + hasListeners := len(b.subscribers) > 0 + b.lock.RUnlock() + return hasListeners } // Broadcast broadcasts a value to all current subscribers. func (b *Broadcaster[V]) Broadcast(value V) { - b.lock.Lock() - ss := slices.Clone(b.subscribers) - b.lock.Unlock() - if len(ss) > 0 { - for _, ch := range ss { - ch.sendCh <- value - } + b.lock.RLock() + defer b.lock.RUnlock() + for _, ch := range b.subscribers { + ch.sendCh <- value } } diff --git a/internal/broadcasters_test.go b/internal/broadcasters_test.go index 9ba13e4d..f968c144 100644 --- a/internal/broadcasters_test.go +++ b/internal/broadcasters_test.go @@ -2,6 +2,7 @@ package internal import ( "fmt" + "sync" "testing" "time" @@ -81,3 +82,31 @@ func testBroadcasterGenerically[V any](t *testing.T, broadcasterFactory func() * }) }) } + +func TestBroadcasterDataRace(t *testing.T) { + t.Parallel() + b := NewBroadcaster[string]() + t.Cleanup(b.Close) + + var waitGroup sync.WaitGroup + for _, fn := range []func(){ + // run every method that uses b.subscribers concurrently to detect data races + func() { b.AddListener() }, + func() { b.Broadcast("foo") }, + func() { b.Close() }, + func() { b.HasListeners() }, + func() { b.RemoveListener(nil) }, + } { + const concurrentRoutinesWithSelf = 2 + // run a method concurrently with itself to detect data races + for i := 0; i < concurrentRoutinesWithSelf; i++ { + waitGroup.Add(1) + fn := fn // make fn a loop-local variable + go func() { + defer waitGroup.Done() + fn() + }() + } + } + waitGroup.Wait() +} From c342ba2de0598469dbcb58fd9d76b2202c612cd1 Mon Sep 17 00:00:00 2001 From: Casey Waldren Date: Wed, 29 May 2024 16:19:45 -0700 Subject: [PATCH 2/6] Additional test for Broadcaster data race (#152) This is a small cleanup to the existing Broadcasters implementation to use `slices.DeleteFunc`. --- internal/broadcasters.go | 23 ++++++++++------------ internal/broadcasters_test.go | 36 +++++++++++++++++++++++++++++++++-- 2 files changed, 44 insertions(+), 15 deletions(-) diff --git a/internal/broadcasters.go b/internal/broadcasters.go index 502be709..5b902d12 100644 --- a/internal/broadcasters.go +++ b/internal/broadcasters.go @@ -1,6 +1,7 @@ package internal import ( + "slices" "sync" ) @@ -21,7 +22,7 @@ type Broadcaster[V any] struct { } // We need to keep track of both the channel we use for sending (stored as a reflect.Value, because Value -// has methods for sending and closing), and also the +// has methods for sending and closing), and also the channel for receiving. type channelPair[V any] struct { sendCh chan<- V receiveCh <-chan V @@ -48,27 +49,23 @@ func (b *Broadcaster[V]) AddListener() <-chan V { func (b *Broadcaster[V]) RemoveListener(ch <-chan V) { b.lock.Lock() defer b.lock.Unlock() - ss := b.subscribers - for i, s := range ss { + b.subscribers = slices.DeleteFunc(b.subscribers, func(pair channelPair[V]) bool { // The following equality test is the reason why we have to store both the sendCh (chan X) and // the receiveCh (<-chan X) for each subscriber; "s.sendCh == ch" would not be true because // they're of two different types. - if s.receiveCh == ch { - copy(ss[i:], ss[i+1:]) - ss[len(ss)-1] = channelPair[V]{} - b.subscribers = ss[:len(ss)-1] - close(s.sendCh) - break + if pair.receiveCh == ch { + close(pair.sendCh) + return true } - } + return false + }) } // HasListeners returns true if there are any current subscribers. func (b *Broadcaster[V]) HasListeners() bool { b.lock.RLock() - hasListeners := len(b.subscribers) > 0 - b.lock.RUnlock() - return hasListeners + defer b.lock.RUnlock() + return len(b.subscribers) > 0 } // Broadcast broadcasts a value to all current subscribers. diff --git a/internal/broadcasters_test.go b/internal/broadcasters_test.go index f968c144..78763b73 100644 --- a/internal/broadcasters_test.go +++ b/internal/broadcasters_test.go @@ -2,6 +2,7 @@ package internal import ( "fmt" + "math/rand" "sync" "testing" "time" @@ -90,7 +91,7 @@ func TestBroadcasterDataRace(t *testing.T) { var waitGroup sync.WaitGroup for _, fn := range []func(){ - // run every method that uses b.subscribers concurrently to detect data races + // Run every method that uses b.subscribers concurrently to detect data races func() { b.AddListener() }, func() { b.Broadcast("foo") }, func() { b.Close() }, @@ -98,7 +99,8 @@ func TestBroadcasterDataRace(t *testing.T) { func() { b.RemoveListener(nil) }, } { const concurrentRoutinesWithSelf = 2 - // run a method concurrently with itself to detect data races + // Run a method concurrently with itself to detect data races. These methods will also be + // run concurrently with the previous/next methods in the list. for i := 0; i < concurrentRoutinesWithSelf; i++ { waitGroup.Add(1) fn := fn // make fn a loop-local variable @@ -110,3 +112,33 @@ func TestBroadcasterDataRace(t *testing.T) { } waitGroup.Wait() } + +func TestBroadcasterDataRaceRandomFunctionOrder(t *testing.T) { + t.Parallel() + b := NewBroadcaster[string]() + t.Cleanup(b.Close) + + funcs := []func(){ + func() { b.AddListener() }, + func() { b.Broadcast("foo") }, + func() { b.Close() }, + func() { b.HasListeners() }, + func() { b.RemoveListener(nil) }, + } + var waitGroup sync.WaitGroup + + const N = 1000 + + // We're going to keep adding random functions to the set of currently executing functions + // for N iterations. This way, we can detect races that might be order-dependent. + + for i := 0; i < N; i++ { + waitGroup.Add(1) + fn := funcs[rand.Intn(len(funcs))] + go func() { + defer waitGroup.Done() + fn() + }() + } + waitGroup.Wait() +} From ade1f06a039ef4ac4f4bc23ce13c1f146b727f89 Mon Sep 17 00:00:00 2001 From: Casey Waldren Date: Mon, 3 Jun 2024 12:11:58 -0700 Subject: [PATCH 3/6] ensure receiver channel is drained --- internal/broadcasters_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/internal/broadcasters_test.go b/internal/broadcasters_test.go index 78763b73..7ea91c68 100644 --- a/internal/broadcasters_test.go +++ b/internal/broadcasters_test.go @@ -119,7 +119,10 @@ func TestBroadcasterDataRaceRandomFunctionOrder(t *testing.T) { t.Cleanup(b.Close) funcs := []func(){ - func() { b.AddListener() }, + func() { + for range b.AddListener() { + } + }, func() { b.Broadcast("foo") }, func() { b.Close() }, func() { b.HasListeners() }, From e8d60fe7316e3325b68f78419b0af8807fc40852 Mon Sep 17 00:00:00 2001 From: Casey Waldren Date: Mon, 3 Jun 2024 13:23:09 -0700 Subject: [PATCH 4/6] Remove random order test This test is fundamentally flawed in that the order of executing the broadcasters is important. We can't execute them in random order without causing deadlock, as receivers need to drain their channels for broadcast to unblock. --- internal/broadcasters_test.go | 34 ---------------------------------- 1 file changed, 34 deletions(-) diff --git a/internal/broadcasters_test.go b/internal/broadcasters_test.go index 7ea91c68..9e7c7962 100644 --- a/internal/broadcasters_test.go +++ b/internal/broadcasters_test.go @@ -2,7 +2,6 @@ package internal import ( "fmt" - "math/rand" "sync" "testing" "time" @@ -112,36 +111,3 @@ func TestBroadcasterDataRace(t *testing.T) { } waitGroup.Wait() } - -func TestBroadcasterDataRaceRandomFunctionOrder(t *testing.T) { - t.Parallel() - b := NewBroadcaster[string]() - t.Cleanup(b.Close) - - funcs := []func(){ - func() { - for range b.AddListener() { - } - }, - func() { b.Broadcast("foo") }, - func() { b.Close() }, - func() { b.HasListeners() }, - func() { b.RemoveListener(nil) }, - } - var waitGroup sync.WaitGroup - - const N = 1000 - - // We're going to keep adding random functions to the set of currently executing functions - // for N iterations. This way, we can detect races that might be order-dependent. - - for i := 0; i < N; i++ { - waitGroup.Add(1) - fn := funcs[rand.Intn(len(funcs))] - go func() { - defer waitGroup.Done() - fn() - }() - } - waitGroup.Wait() -} From 93c1e3949da3443105e3475f6f42940a1467571f Mon Sep 17 00:00:00 2001 From: Casey Waldren Date: Mon, 3 Jun 2024 15:25:49 -0700 Subject: [PATCH 5/6] add test for concurrent AddListener and Broadcast --- internal/broadcasters.go | 5 +++-- internal/broadcasters_test.go | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/internal/broadcasters.go b/internal/broadcasters.go index 5b902d12..cb88c0f8 100644 --- a/internal/broadcasters.go +++ b/internal/broadcasters.go @@ -71,8 +71,9 @@ func (b *Broadcaster[V]) HasListeners() bool { // Broadcast broadcasts a value to all current subscribers. func (b *Broadcaster[V]) Broadcast(value V) { b.lock.RLock() - defer b.lock.RUnlock() - for _, ch := range b.subscribers { + subs := slices.Clone(b.subscribers) + b.lock.RUnlock() + for _, ch := range subs { ch.sendCh <- value } } diff --git a/internal/broadcasters_test.go b/internal/broadcasters_test.go index 9e7c7962..0bc2153e 100644 --- a/internal/broadcasters_test.go +++ b/internal/broadcasters_test.go @@ -111,3 +111,35 @@ func TestBroadcasterDataRace(t *testing.T) { } waitGroup.Wait() } + +func TestBroadcastWhileAddingListener(t *testing.T) { + // The purpose of this test is to ensure that the Broadcast method doesn't block adding a listener concurrently. + // This would be the case if the Broadcaster held a write lock while broadcasting, since the channel sends + // could take an arbitrary amount of time. Instead, it clones the list of subscribers locally. + t.Parallel() + b := NewBroadcaster[string]() + t.Cleanup(b.Close) + + // This returns a buffered channel. Fill the buffer entirely. + listener1 := b.AddListener() + for i := 0; i < subscriberChannelBufferLength; i++ { + b.Broadcast("foo") + } + + isUnblocked := make(chan struct{}) + go func() { + // This should block until we either pop something from the channel, or close it. + b.Broadcast("blocked!") + close(isUnblocked) + }() + + th.AssertNoMoreValues(t, isUnblocked, 100*time.Millisecond, "Expected Broadcast to remain blocked") + + // Now, we should be able to add a listener while Broadcast is still blocked. + b.AddListener() + + th.AssertNoMoreValues(t, isUnblocked, 100*time.Millisecond, "Expected Broadcast to remain blocked") + + <-listener1 // Allow Broadcast to push the final value to the listener. + th.AssertChannelClosed(t, isUnblocked, 100*time.Millisecond) +} From 5b1a26a1f2e0c88f5bc5e984f7c0bf5821e06d4b Mon Sep 17 00:00:00 2001 From: Casey Waldren Date: Tue, 4 Jun 2024 10:57:14 -0700 Subject: [PATCH 6/6] revert to locking around the broadcast --- internal/broadcasters.go | 5 ++--- internal/broadcasters_test.go | 32 -------------------------------- 2 files changed, 2 insertions(+), 35 deletions(-) diff --git a/internal/broadcasters.go b/internal/broadcasters.go index cb88c0f8..5b902d12 100644 --- a/internal/broadcasters.go +++ b/internal/broadcasters.go @@ -71,9 +71,8 @@ func (b *Broadcaster[V]) HasListeners() bool { // Broadcast broadcasts a value to all current subscribers. func (b *Broadcaster[V]) Broadcast(value V) { b.lock.RLock() - subs := slices.Clone(b.subscribers) - b.lock.RUnlock() - for _, ch := range subs { + defer b.lock.RUnlock() + for _, ch := range b.subscribers { ch.sendCh <- value } } diff --git a/internal/broadcasters_test.go b/internal/broadcasters_test.go index 0bc2153e..9e7c7962 100644 --- a/internal/broadcasters_test.go +++ b/internal/broadcasters_test.go @@ -111,35 +111,3 @@ func TestBroadcasterDataRace(t *testing.T) { } waitGroup.Wait() } - -func TestBroadcastWhileAddingListener(t *testing.T) { - // The purpose of this test is to ensure that the Broadcast method doesn't block adding a listener concurrently. - // This would be the case if the Broadcaster held a write lock while broadcasting, since the channel sends - // could take an arbitrary amount of time. Instead, it clones the list of subscribers locally. - t.Parallel() - b := NewBroadcaster[string]() - t.Cleanup(b.Close) - - // This returns a buffered channel. Fill the buffer entirely. - listener1 := b.AddListener() - for i := 0; i < subscriberChannelBufferLength; i++ { - b.Broadcast("foo") - } - - isUnblocked := make(chan struct{}) - go func() { - // This should block until we either pop something from the channel, or close it. - b.Broadcast("blocked!") - close(isUnblocked) - }() - - th.AssertNoMoreValues(t, isUnblocked, 100*time.Millisecond, "Expected Broadcast to remain blocked") - - // Now, we should be able to add a listener while Broadcast is still blocked. - b.AddListener() - - th.AssertNoMoreValues(t, isUnblocked, 100*time.Millisecond, "Expected Broadcast to remain blocked") - - <-listener1 // Allow Broadcast to push the final value to the listener. - th.AssertChannelClosed(t, isUnblocked, 100*time.Millisecond) -}