diff --git a/utils/event_emitter.go b/utils/event_emitter.go deleted file mode 100644 index 1dbf6194..00000000 --- a/utils/event_emitter.go +++ /dev/null @@ -1,310 +0,0 @@ -// Copyright 2023 LiveKit, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package utils - -import ( - "sync" - - "golang.org/x/exp/maps" - - "github.com/livekit/protocol/logger" -) - -const DefaultEventQueueSize = 16 - -type EventEmitterParams struct { - QueueSize int - Blocking bool - Logger logger.Logger -} - -func DefaultEventEmitterParams() EventEmitterParams { - return EventEmitterParams{ - QueueSize: DefaultEventQueueSize, - Logger: logger.GetLogger(), - } -} - -type EventEmitter[K comparable, V any] struct { - params EventEmitterParams - mu sync.RWMutex - observers map[K]*EventObserverList[V] -} - -func NewEventEmitter[K comparable, V any](params EventEmitterParams) *EventEmitter[K, V] { - return &EventEmitter[K, V]{ - params: params, - observers: map[K]*EventObserverList[V]{}, - } -} - -func NewDefaultEventEmitter[K comparable, V any]() *EventEmitter[K, V] { - return NewEventEmitter[K, V](DefaultEventEmitterParams()) -} - -func (e *EventEmitter[K, V]) Emit(k K, v V) { - e.mu.RLock() - l, ok := e.observers[k] - e.mu.RUnlock() - if !ok { - return - } - - l.Emit(v) -} - -type eventEmitterObserver[K comparable, V any] struct { - e *EventEmitter[K, V] - k K - EventObserver[V] -} - -func (o *eventEmitterObserver[K, V]) Stop() { - o.EventObserver.Stop() - o.e.cleanUpObserverList(o.k) -} - -func (e *EventEmitter[K, V]) On(k K, f func(V)) func() { - e.mu.Lock() - o := e.getOrCreateEventObserverList(k).on(f) - e.mu.Unlock() - - return (&eventEmitterObserver[K, V]{e, k, o}).Stop -} - -func (e *EventEmitter[K, V]) Notify(k K, ch chan V) func() { - return e.observe(k, ch).Stop -} - -func (e *EventEmitter[K, V]) Observe(k K) EventObserver[V] { - return e.observe(k, make(chan V, e.params.QueueSize)) -} - -func (e *EventEmitter[K, V]) observe(k K, ch chan V) *eventEmitterObserver[K, V] { - e.mu.Lock() - o := e.getOrCreateEventObserverList(k).observe(ch) - e.mu.Unlock() - - return &eventEmitterObserver[K, V]{e, k, o} -} - -func (e *EventEmitter[K, V]) getOrCreateEventObserverList(k K) *EventObserverList[V] { - l, ok := e.observers[k] - if !ok { - l = NewEventObserverList[V](e.params) - e.observers[k] = l - } - return l -} - -func (e *EventEmitter[K, V]) ObservedKeys() []K { - e.mu.Lock() - defer e.mu.Unlock() - return maps.Keys(e.observers) -} - -func (e *EventEmitter[K, V]) cleanUpObserverList(k K) { - e.mu.Lock() - defer e.mu.Unlock() - - l, ok := e.observers[k] - if ok && l.Len() == 0 { - delete(e.observers, k) - } -} - -type EventObserverList[V any] struct { - params EventEmitterParams - mu sync.RWMutex - observers []*eventObserverListObserver[V] -} - -func NewEventObserverList[V any](params EventEmitterParams) *EventObserverList[V] { - return &EventObserverList[V]{ - params: params, - } -} - -func NewDefaultEventObserverList[V any]() *EventObserverList[V] { - return NewEventObserverList[V](DefaultEventEmitterParams()) -} - -func (l *EventObserverList[V]) Len() int { - l.mu.RLock() - defer l.mu.RUnlock() - return len(l.observers) -} - -type eventObserverListObserver[V any] struct { - l *EventObserverList[V] - EventObserver[V] - index int -} - -func (o *eventObserverListObserver[V]) Stop() { - o.EventObserver.Stop() - o.l.stopObserving(o) -} - -func (l *EventObserverList[V]) On(f func(V)) func() { - return l.on(f).Stop -} - -func (l *EventObserverList[V]) on(f func(V)) *eventObserverListObserver[V] { - o := &eventObserverListObserver[V]{l: l} - - if l.params.Blocking { - o.EventObserver = blockingEventCallback[V](f) - } else { - o.EventObserver = nonblockingEventCallback[V](f) - } - - l.startObserving(o) - return o -} - -func (l *EventObserverList[V]) Notify(ch chan V) func() { - return l.observe(ch).Stop -} - -func (l *EventObserverList[V]) Observe() EventObserver[V] { - return l.observe(make(chan V, l.params.QueueSize)) -} - -func (l *EventObserverList[V]) observe(ch chan V) *eventObserverListObserver[V] { - o := &eventObserverListObserver[V]{l: l} - - if l.params.Blocking { - o.EventObserver = &blockingEventObserver[V]{ - done: make(chan struct{}), - ch: ch, - } - } else { - o.EventObserver = &nonblockingEventObserver[V]{ - logger: l.params.Logger, - ch: ch, - } - } - - l.startObserving(o) - return o -} - -func (l *EventObserverList[V]) Emit(v V) { - l.mu.RLock() - defer l.mu.RUnlock() - for _, o := range l.observers { - o.emit(v) - } -} - -func (l *EventObserverList[V]) startObserving(o *eventObserverListObserver[V]) { - l.mu.Lock() - defer l.mu.Unlock() - o.index = len(l.observers) - l.observers = append(l.observers, o) -} - -func (l *EventObserverList[V]) stopObserving(o *eventObserverListObserver[V]) { - l.mu.Lock() - defer l.mu.Unlock() - l.observers[o.index] = l.observers[len(l.observers)-1] - l.observers[o.index].index = o.index - l.observers[len(l.observers)-1] = nil - l.observers = l.observers[:len(l.observers)-1] -} - -type EventObserver[V any] interface { - emit(v V) - Stop() - Events() <-chan V -} - -type eventObserver[V any] struct { - stopFunc func() - EventObserver[V] -} - -func (o *eventObserver[V]) Stop() { - o.stopFunc() - o.EventObserver.Stop() -} - -func NewEventObserver[V any](stopFunc func()) (EventObserver[V], func(v V)) { - o := &nonblockingEventObserver[V]{ - logger: logger.GetLogger(), - ch: make(chan V, DefaultEventQueueSize), - } - return &eventObserver[V]{stopFunc, o}, o.emit -} - -type nonblockingEventObserver[V any] struct { - logger logger.Logger - ch chan V -} - -func (o *nonblockingEventObserver[V]) emit(v V) { - select { - case o.ch <- v: - default: - o.logger.Warnw("could not add event to observer queue", nil) - } -} - -func (o *nonblockingEventObserver[V]) Stop() {} - -func (o *nonblockingEventObserver[V]) Events() <-chan V { - return o.ch -} - -type blockingEventObserver[V any] struct { - done chan struct{} - ch chan V -} - -func (o *blockingEventObserver[V]) emit(v V) { - select { - case o.ch <- v: - case <-o.done: - } -} - -func (o *blockingEventObserver[V]) Stop() { - close(o.done) -} - -func (o *blockingEventObserver[V]) Events() <-chan V { - return o.ch -} - -type nonblockingEventCallback[V any] func(V) - -func (o nonblockingEventCallback[V]) emit(v V) { - go o(v) -} - -func (o nonblockingEventCallback[V]) Stop() {} - -func (o nonblockingEventCallback[V]) Events() <-chan V { return nil } - -type blockingEventCallback[V any] func(V) - -func (o blockingEventCallback[V]) emit(v V) { - o(v) -} - -func (o blockingEventCallback[V]) Stop() {} - -func (o blockingEventCallback[V]) Events() <-chan V { return nil } diff --git a/utils/events/emitter.go b/utils/events/emitter.go new file mode 100644 index 00000000..eecc8312 --- /dev/null +++ b/utils/events/emitter.go @@ -0,0 +1,337 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package events + +import ( + "sync" + + "golang.org/x/exp/maps" + + "github.com/livekit/protocol/logger" + "github.com/livekit/protocol/utils/options" +) + +const DefaultQueueSize = 16 + +type Options struct { + QueueSize int + Blocking bool + Logger logger.Logger +} + +type Option func(o *Options) + +func WithQueueSize(size int) Option { + return func(o *Options) { + o.QueueSize = size + } +} + +func WithBlocking() Option { + return func(o *Options) { + o.Blocking = true + } +} + +func WithLogger(l logger.Logger) Option { + return func(o *Options) { + o.Logger = l + } +} + +func WithOptions(src Options) Option { + return func(o *Options) { + *o = src + } +} + +func DefaultOptions() Options { + return Options{ + QueueSize: DefaultQueueSize, + Logger: logger.GetLogger(), + } +} + +type Emitter[K comparable, V any] struct { + options Options + mu sync.RWMutex + observers map[K]*ObserverList[V] +} + +func NewEmitter[K comparable, V any](opts ...Option) *Emitter[K, V] { + o := DefaultOptions() + options.Apply(&o, opts) + return &Emitter[K, V]{ + options: o, + observers: map[K]*ObserverList[V]{}, + } +} + +func (e *Emitter[K, V]) Emit(k K, v V) { + e.mu.RLock() + l, ok := e.observers[k] + e.mu.RUnlock() + if !ok { + return + } + + l.Emit(v) +} + +type eventEmitterObserver[K comparable, V any] struct { + e *Emitter[K, V] + k K + Observer[V] +} + +func (o *eventEmitterObserver[K, V]) Stop() { + o.Observer.Stop() + o.e.cleanUpObserverList(o.k) +} + +func (e *Emitter[K, V]) On(k K, f func(V)) func() { + e.mu.Lock() + o := e.getOrCreateObserverList(k).on(f) + e.mu.Unlock() + + return (&eventEmitterObserver[K, V]{e, k, o}).Stop +} + +func (e *Emitter[K, V]) Notify(k K, ch chan V) func() { + return e.observe(k, ch).Stop +} + +func (e *Emitter[K, V]) Observe(k K) Observer[V] { + return e.observe(k, make(chan V, e.options.QueueSize)) +} + +func (e *Emitter[K, V]) observe(k K, ch chan V) *eventEmitterObserver[K, V] { + e.mu.Lock() + o := e.getOrCreateObserverList(k).observe(ch) + e.mu.Unlock() + + return &eventEmitterObserver[K, V]{e, k, o} +} + +func (e *Emitter[K, V]) getOrCreateObserverList(k K) *ObserverList[V] { + l, ok := e.observers[k] + if !ok { + l = newObserverList[V](e.options) + e.observers[k] = l + } + return l +} + +func (e *Emitter[K, V]) ObservedKeys() []K { + e.mu.Lock() + defer e.mu.Unlock() + return maps.Keys(e.observers) +} + +func (e *Emitter[K, V]) cleanUpObserverList(k K) { + e.mu.Lock() + defer e.mu.Unlock() + + l, ok := e.observers[k] + if ok && l.Len() == 0 { + delete(e.observers, k) + } +} + +type ObserverList[V any] struct { + options Options + mu sync.RWMutex + observers []*eventObserverListObserver[V] +} + +func NewObserverList[V any](opts ...Option) *ObserverList[V] { + o := DefaultOptions() + options.Apply(&o, opts) + return newObserverList[V](o) +} + +func newObserverList[V any](options Options) *ObserverList[V] { + return &ObserverList[V]{ + options: options, + } +} + +func (l *ObserverList[V]) Len() int { + l.mu.RLock() + defer l.mu.RUnlock() + return len(l.observers) +} + +type eventObserverListObserver[V any] struct { + l *ObserverList[V] + Observer[V] + index int +} + +func (o *eventObserverListObserver[V]) Stop() { + o.Observer.Stop() + o.l.stopObserving(o) +} + +func (l *ObserverList[V]) On(f func(V)) func() { + return l.on(f).Stop +} + +func (l *ObserverList[V]) on(f func(V)) *eventObserverListObserver[V] { + o := &eventObserverListObserver[V]{l: l} + + if l.options.Blocking { + o.Observer = blockingCallback[V](f) + } else { + o.Observer = nonblockingCallback[V](f) + } + + l.startObserving(o) + return o +} + +func (l *ObserverList[V]) Notify(ch chan V) func() { + return l.observe(ch).Stop +} + +func (l *ObserverList[V]) Observe() Observer[V] { + return l.observe(make(chan V, l.options.QueueSize)) +} + +func (l *ObserverList[V]) observe(ch chan V) *eventObserverListObserver[V] { + o := &eventObserverListObserver[V]{l: l} + + if l.options.Blocking { + o.Observer = &blockingObserver[V]{ + done: make(chan struct{}), + ch: ch, + } + } else { + o.Observer = &nonblockingObserver[V]{ + logger: l.options.Logger, + ch: ch, + } + } + + l.startObserving(o) + return o +} + +func (l *ObserverList[V]) Emit(v V) { + l.mu.RLock() + defer l.mu.RUnlock() + for _, o := range l.observers { + o.emit(v) + } +} + +func (l *ObserverList[V]) startObserving(o *eventObserverListObserver[V]) { + l.mu.Lock() + defer l.mu.Unlock() + o.index = len(l.observers) + l.observers = append(l.observers, o) +} + +func (l *ObserverList[V]) stopObserving(o *eventObserverListObserver[V]) { + l.mu.Lock() + defer l.mu.Unlock() + l.observers[o.index] = l.observers[len(l.observers)-1] + l.observers[o.index].index = o.index + l.observers[len(l.observers)-1] = nil + l.observers = l.observers[:len(l.observers)-1] +} + +type Observer[V any] interface { + emit(v V) + Stop() + Events() <-chan V +} + +type eventObserver[V any] struct { + stopFunc func() + Observer[V] +} + +func (o *eventObserver[V]) Stop() { + o.stopFunc() + o.Observer.Stop() +} + +func NewObserver[V any](stopFunc func()) (Observer[V], func(v V)) { + o := &nonblockingObserver[V]{ + logger: logger.GetLogger(), + ch: make(chan V, DefaultQueueSize), + } + return &eventObserver[V]{stopFunc, o}, o.emit +} + +type nonblockingObserver[V any] struct { + logger logger.Logger + ch chan V +} + +func (o *nonblockingObserver[V]) emit(v V) { + select { + case o.ch <- v: + default: + o.logger.Warnw("could not add event to observer queue", nil) + } +} + +func (o *nonblockingObserver[V]) Stop() {} + +func (o *nonblockingObserver[V]) Events() <-chan V { + return o.ch +} + +type blockingObserver[V any] struct { + done chan struct{} + ch chan V +} + +func (o *blockingObserver[V]) emit(v V) { + select { + case o.ch <- v: + case <-o.done: + } +} + +func (o *blockingObserver[V]) Stop() { + close(o.done) +} + +func (o *blockingObserver[V]) Events() <-chan V { + return o.ch +} + +type nonblockingCallback[V any] func(V) + +func (o nonblockingCallback[V]) emit(v V) { + go o(v) +} + +func (o nonblockingCallback[V]) Stop() {} + +func (o nonblockingCallback[V]) Events() <-chan V { return nil } + +type blockingCallback[V any] func(V) + +func (o blockingCallback[V]) emit(v V) { + o(v) +} + +func (o blockingCallback[V]) Stop() {} + +func (o blockingCallback[V]) Events() <-chan V { return nil } diff --git a/utils/event_emitter_test.go b/utils/events/emitter_test.go similarity index 82% rename from utils/event_emitter_test.go rename to utils/events/emitter_test.go index 30c4b7d2..326d529a 100644 --- a/utils/event_emitter_test.go +++ b/utils/events/emitter_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package utils +package events import ( "sort" @@ -24,7 +24,7 @@ import ( func TestEventEmitter(t *testing.T) { t.Run("emitter", func(t *testing.T) { - emitter := NewDefaultEventEmitter[string, int]() + emitter := NewEmitter[string, int]() ao0 := emitter.Observe("a") ao1 := emitter.Observe("a") bo := emitter.Observe("b") @@ -53,7 +53,7 @@ func TestEventEmitter(t *testing.T) { t.Run("observer", func(t *testing.T) { var closeCalled bool - o, emit := NewEventObserver[int](func() { closeCalled = true }) + o, emit := NewObserver[int](func() { closeCalled = true }) emit(1) require.Equal(t, 1, <-o.Events()) @@ -63,7 +63,7 @@ func TestEventEmitter(t *testing.T) { }) t.Run("notify", func(t *testing.T) { - emitter := NewDefaultEventEmitter[string, int]() + emitter := NewEmitter[string, int]() as := make(chan int, 1) stop := emitter.Notify("a", as) @@ -87,7 +87,7 @@ func TestEventEmitter(t *testing.T) { }) t.Run("on", func(t *testing.T) { - emitter := NewDefaultEventEmitter[string, int]() + emitter := NewEmitter[string, int]() as := make(chan int, 1) stop := emitter.On("a", func(i int) { @@ -113,22 +113,16 @@ func TestEventEmitter(t *testing.T) { }) t.Run("stop unblocks blocking observers", func(t *testing.T) { - observer, emit := NewEventObserver[int](func() {}) + observer, emit := NewObserver[int](func() {}) - list := NewEventObserverList[int](EventEmitterParams{ - QueueSize: DefaultEventQueueSize, - Blocking: true, - }) + list := NewObserverList[int](WithBlocking()) - emitter := NewEventEmitter[int, int](EventEmitterParams{ - QueueSize: DefaultEventQueueSize, - Blocking: true, - }) + emitter := NewEmitter[int, int](WithBlocking()) cases := []struct { label string emit func() - observer EventObserver[int] + observer Observer[int] }{ { label: "observer", @@ -154,7 +148,7 @@ func TestEventEmitter(t *testing.T) { ready := make(chan struct{}) go func() { - for i := 0; i < DefaultEventQueueSize; i++ { + for i := 0; i < DefaultQueueSize; i++ { c.emit() } close(ready) @@ -186,9 +180,7 @@ func TestEventEmitter(t *testing.T) { } func BenchmarkEventEmitter(b *testing.B) { - e := NewEventEmitter[int, int](EventEmitterParams{ - QueueSize: DefaultEventQueueSize, - }) + e := NewEmitter[int, int]() for i := 0; i < b.N; i++ { o := e.Observe(i) e.Emit(i, i) @@ -198,9 +190,7 @@ func BenchmarkEventEmitter(b *testing.B) { } func BenchmarkEventObserverList(b *testing.B) { - l := NewEventObserverList[int](EventEmitterParams{ - QueueSize: DefaultEventQueueSize, - }) + l := NewObserverList[int]() for i := 0; i < b.N; i++ { o := l.Observe() l.Emit(i)