Skip to content

Commit

Permalink
add callback functions to event emitter
Browse files Browse the repository at this point in the history
  • Loading branch information
paulwe committed Sep 18, 2024
1 parent 9675e0c commit 32a7635
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 11 deletions.
90 changes: 79 additions & 11 deletions utils/event_emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,17 +76,37 @@ func (o *eventEmitterObserver[K, V]) Stop() {
o.e.cleanUpObserverList(o.k)
}

func (e *EventEmitter[K, V]) On(k K, f func(V)) func() {
e.mu.Lock()
o := e.getOrCreateEventObserverList(k).on(f)
e.mu.Unlock()

return (&eventEmitterObserver[K, V]{e, k, o}).Stop
}

func (e *EventEmitter[K, V]) Notify(k K, ch chan V) func() {
return e.observe(k, ch).Stop
}

func (e *EventEmitter[K, V]) Observe(k K) EventObserver[V] {
return e.observe(k, make(chan V, e.params.QueueSize))
}

func (e *EventEmitter[K, V]) observe(k K, ch chan V) *eventEmitterObserver[K, V] {
e.mu.Lock()
o := e.getOrCreateEventObserverList(k).observe(ch)
e.mu.Unlock()

return &eventEmitterObserver[K, V]{e, k, o}
}

func (e *EventEmitter[K, V]) getOrCreateEventObserverList(k K) *EventObserverList[V] {
l, ok := e.observers[k]
if !ok {
l = NewEventObserverList[V](e.params)
e.observers[k] = l
}
o := l.Observe()
e.mu.Unlock()

return &eventEmitterObserver[K, V]{e, k, o}
return l
}

func (e *EventEmitter[K, V]) ObservedKeys() []K {
Expand Down Expand Up @@ -138,26 +158,47 @@ func (o *eventObserverListObserver[V]) Stop() {
o.l.stopObserving(o)
}

func (l *EventObserverList[V]) On(f func(V)) func() {
return l.on(f).Stop
}

func (l *EventObserverList[V]) on(f func(V)) *eventObserverListObserver[V] {
o := &eventObserverListObserver[V]{l: l}

if l.params.Blocking {
o.EventObserver = blockingEventCallback[V](f)
} else {
o.EventObserver = nonblockingEventCallback[V](f)
}

l.startObserving(o)
return o
}

func (l *EventObserverList[V]) Notify(ch chan V) func() {
return l.observe(ch).Stop
}

func (l *EventObserverList[V]) Observe() EventObserver[V] {
return l.observe(make(chan V, l.params.QueueSize))
}

func (l *EventObserverList[V]) observe(ch chan V) *eventObserverListObserver[V] {
o := &eventObserverListObserver[V]{l: l}

if l.params.Blocking {
o.EventObserver = &blockingEventObserver[V]{
done: make(chan struct{}),
ch: make(chan V, l.params.QueueSize),
ch: ch,
}
} else {
o.EventObserver = &nonblockingEventObserver[V]{
logger: l.params.Logger,
ch: make(chan V, l.params.QueueSize),
ch: ch,
}
}

l.mu.Lock()
o.index = len(l.observers)
l.observers = append(l.observers, o)
l.mu.Unlock()

l.startObserving(o)
return o
}

Expand All @@ -169,6 +210,13 @@ func (l *EventObserverList[V]) Emit(v V) {
}
}

func (l *EventObserverList[V]) startObserving(o *eventObserverListObserver[V]) {
l.mu.Lock()
defer l.mu.Unlock()
o.index = len(l.observers)
l.observers = append(l.observers, o)
}

func (l *EventObserverList[V]) stopObserving(o *eventObserverListObserver[V]) {
l.mu.Lock()
defer l.mu.Unlock()
Expand Down Expand Up @@ -240,3 +288,23 @@ func (o *blockingEventObserver[V]) Stop() {
func (o *blockingEventObserver[V]) Events() <-chan V {
return o.ch
}

type nonblockingEventCallback[V any] func(V)

func (o nonblockingEventCallback[V]) emit(v V) {
go o(v)
}

func (o nonblockingEventCallback[V]) Stop() {}

func (o nonblockingEventCallback[V]) Events() <-chan V { return nil }

type blockingEventCallback[V any] func(V)

func (o blockingEventCallback[V]) emit(v V) {
o(v)
}

func (o blockingEventCallback[V]) Stop() {}

func (o blockingEventCallback[V]) Events() <-chan V { return nil }
50 changes: 50 additions & 0 deletions utils/event_emitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,56 @@ func TestEventEmitter(t *testing.T) {
require.True(t, closeCalled)
})

t.Run("notify", func(t *testing.T) {
emitter := NewDefaultEventEmitter[string, int]()

as := make(chan int, 1)
stop := emitter.Notify("a", as)

emitter.Emit("a", 0)
select {
case v := <-as:
require.Equal(t, 0, v)
default:
require.FailNow(t, "expected event in channel")
}

stop()

emitter.Emit("a", 0)
select {
case <-as:
require.FailNow(t, "expected no event in channel after stop")
default:
}
})

t.Run("on", func(t *testing.T) {
emitter := NewDefaultEventEmitter[string, int]()

as := make(chan int, 1)
stop := emitter.On("a", func(i int) {
as <- i
})

emitter.Emit("a", 0)
select {
case v := <-as:
require.Equal(t, 0, v)
case <-time.After(100 * time.Millisecond):
require.FailNow(t, "expected event in channel")
}

stop()

emitter.Emit("a", 0)
select {
case <-as:
require.FailNow(t, "expected no event in channel after stop")
case <-time.After(100 * time.Millisecond):
}
})

t.Run("stop unblocks blocking observers", func(t *testing.T) {
observer, emit := NewEventObserver[int](func() {})

Expand Down

0 comments on commit 32a7635

Please sign in to comment.