Skip to content

Commit

Permalink
unblock blocked event emitters when stopped (#790)
Browse files Browse the repository at this point in the history
  • Loading branch information
paulwe authored Aug 14, 2024
1 parent 0072ee0 commit f5a53f8
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 1 deletion.
2 changes: 1 addition & 1 deletion utils/event_emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ type eventObserverListObserver[V any] struct {
}

func (o *eventObserverListObserver[V]) Stop() {
o.l.stopObserving(o)
o.EventObserver.Stop()
o.l.stopObserving(o)
}

func (l *EventObserverList[V]) Observe() EventObserver[V] {
Expand Down
73 changes: 73 additions & 0 deletions utils/event_emitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package utils
import (
"sort"
"testing"
"time"

"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -60,6 +61,78 @@ func TestEventEmitter(t *testing.T) {
o.Stop()
require.True(t, closeCalled)
})

t.Run("stop unblocks blocking observers", func(t *testing.T) {
observer, emit := NewEventObserver[int](func() {})

list := NewEventObserverList[int](EventEmitterParams{
QueueSize: DefaultEventQueueSize,
Blocking: true,
})

emitter := NewEventEmitter[int, int](EventEmitterParams{
QueueSize: DefaultEventQueueSize,
Blocking: true,
})

cases := []struct {
label string
emit func()
observer EventObserver[int]
}{
{
label: "observer",
emit: func() { emit(0) },
observer: observer,
},
{
label: "list",
emit: func() { list.Emit(0) },
observer: list.Observe(),
},
{
label: "emitter",
emit: func() { emitter.Emit(0, 0) },
observer: emitter.Observe(0),
},
}

for _, c := range cases {
t.Run(c.label, func(t *testing.T) {
emitDone := make(chan struct{})
stopDone := make(chan struct{})
ready := make(chan struct{})

go func() {
for i := 0; i < DefaultEventQueueSize; i++ {
c.emit()
}
close(ready)
c.emit()
close(emitDone)
}()

go func() {
<-ready
time.Sleep(100 * time.Millisecond)
c.observer.Stop()
close(stopDone)
}()

select {
case <-emitDone:
case <-time.After(time.Second):
require.FailNow(t, "timeout waiting for emit to unblock")
}

select {
case <-stopDone:
case <-time.After(time.Second):
require.FailNow(t, "timeout waiting for stop to unblock")
}
})
}
})
}

func BenchmarkEventEmitter(b *testing.B) {
Expand Down

0 comments on commit f5a53f8

Please sign in to comment.