Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into api-cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
paulwe committed Sep 18, 2024
2 parents 471b058 + 822e901 commit 89a44c6
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 40 deletions.
90 changes: 79 additions & 11 deletions utils/event_emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,17 +76,37 @@ func (o *eventEmitterObserver[K, V]) Stop() {
o.e.cleanUpObserverList(o.k)
}

func (e *EventEmitter[K, V]) On(k K, f func(V)) func() {
e.mu.Lock()
o := e.getOrCreateEventObserverList(k).on(f)
e.mu.Unlock()

return (&eventEmitterObserver[K, V]{e, k, o}).Stop
}

func (e *EventEmitter[K, V]) Notify(k K, ch chan V) func() {
return e.observe(k, ch).Stop
}

func (e *EventEmitter[K, V]) Observe(k K) EventObserver[V] {
return e.observe(k, make(chan V, e.params.QueueSize))
}

func (e *EventEmitter[K, V]) observe(k K, ch chan V) *eventEmitterObserver[K, V] {
e.mu.Lock()
o := e.getOrCreateEventObserverList(k).observe(ch)
e.mu.Unlock()

return &eventEmitterObserver[K, V]{e, k, o}
}

func (e *EventEmitter[K, V]) getOrCreateEventObserverList(k K) *EventObserverList[V] {
l, ok := e.observers[k]
if !ok {
l = NewEventObserverList[V](e.params)
e.observers[k] = l
}
o := l.Observe()
e.mu.Unlock()

return &eventEmitterObserver[K, V]{e, k, o}
return l
}

func (e *EventEmitter[K, V]) ObservedKeys() []K {
Expand Down Expand Up @@ -138,26 +158,47 @@ func (o *eventObserverListObserver[V]) Stop() {
o.l.stopObserving(o)
}

func (l *EventObserverList[V]) On(f func(V)) func() {
return l.on(f).Stop
}

func (l *EventObserverList[V]) on(f func(V)) *eventObserverListObserver[V] {
o := &eventObserverListObserver[V]{l: l}

if l.params.Blocking {
o.EventObserver = blockingEventCallback[V](f)
} else {
o.EventObserver = nonblockingEventCallback[V](f)
}

l.startObserving(o)
return o
}

func (l *EventObserverList[V]) Notify(ch chan V) func() {
return l.observe(ch).Stop
}

func (l *EventObserverList[V]) Observe() EventObserver[V] {
return l.observe(make(chan V, l.params.QueueSize))
}

func (l *EventObserverList[V]) observe(ch chan V) *eventObserverListObserver[V] {
o := &eventObserverListObserver[V]{l: l}

if l.params.Blocking {
o.EventObserver = &blockingEventObserver[V]{
done: make(chan struct{}),
ch: make(chan V, l.params.QueueSize),
ch: ch,
}
} else {
o.EventObserver = &nonblockingEventObserver[V]{
logger: l.params.Logger,
ch: make(chan V, l.params.QueueSize),
ch: ch,
}
}

l.mu.Lock()
o.index = len(l.observers)
l.observers = append(l.observers, o)
l.mu.Unlock()

l.startObserving(o)
return o
}

Expand All @@ -169,6 +210,13 @@ func (l *EventObserverList[V]) Emit(v V) {
}
}

func (l *EventObserverList[V]) startObserving(o *eventObserverListObserver[V]) {
l.mu.Lock()
defer l.mu.Unlock()
o.index = len(l.observers)
l.observers = append(l.observers, o)
}

func (l *EventObserverList[V]) stopObserving(o *eventObserverListObserver[V]) {
l.mu.Lock()
defer l.mu.Unlock()
Expand Down Expand Up @@ -240,3 +288,23 @@ func (o *blockingEventObserver[V]) Stop() {
func (o *blockingEventObserver[V]) Events() <-chan V {
return o.ch
}

type nonblockingEventCallback[V any] func(V)

func (o nonblockingEventCallback[V]) emit(v V) {
go o(v)
}

func (o nonblockingEventCallback[V]) Stop() {}

func (o nonblockingEventCallback[V]) Events() <-chan V { return nil }

type blockingEventCallback[V any] func(V)

func (o blockingEventCallback[V]) emit(v V) {
o(v)
}

func (o blockingEventCallback[V]) Stop() {}

func (o blockingEventCallback[V]) Events() <-chan V { return nil }
50 changes: 50 additions & 0 deletions utils/event_emitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,56 @@ func TestEventEmitter(t *testing.T) {
require.True(t, closeCalled)
})

t.Run("notify", func(t *testing.T) {
emitter := NewDefaultEventEmitter[string, int]()

as := make(chan int, 1)
stop := emitter.Notify("a", as)

emitter.Emit("a", 0)
select {
case v := <-as:
require.Equal(t, 0, v)
default:
require.FailNow(t, "expected event in channel")
}

stop()

emitter.Emit("a", 0)
select {
case <-as:
require.FailNow(t, "expected no event in channel after stop")
default:
}
})

t.Run("on", func(t *testing.T) {
emitter := NewDefaultEventEmitter[string, int]()

as := make(chan int, 1)
stop := emitter.On("a", func(i int) {
as <- i
})

emitter.Emit("a", 0)
select {
case v := <-as:
require.Equal(t, 0, v)
case <-time.After(100 * time.Millisecond):
require.FailNow(t, "expected event in channel")
}

stop()

emitter.Emit("a", 0)
select {
case <-as:
require.FailNow(t, "expected no event in channel after stop")
case <-time.After(100 * time.Millisecond):
}
})

t.Run("stop unblocks blocking observers", func(t *testing.T) {
observer, emit := NewEventObserver[int](func() {})

Expand Down
37 changes: 8 additions & 29 deletions utils/math.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,37 +14,16 @@

package utils

import (
"time"
import "math"

"golang.org/x/exp/constraints"
)

type Numeric interface {
constraints.Signed | constraints.Unsigned | time.Duration
}

func Max[T Numeric](vs ...T) T {
return Least(func(a, b T) bool { return a > b }, vs...)
}

func Min[T Numeric](vs ...T) T {
return Least(func(a, b T) bool { return a < b }, vs...)
}

func Most[T Numeric](less func(a, b T) bool, vs ...T) T {
return Least(func(a, b T) bool { return !less(a, b) }, vs...)
func LogisticFunc(x0, L, k float64) func(x float64) float64 {
return func(x float64) float64 {
return L / (1 + math.Pow(math.E, -k*(x-x0)))
}
}

func Least[T Numeric](less func(a, b T) bool, vs ...T) T {
if len(vs) == 0 {
return 0
}
v := vs[0]
for i := 1; i < len(vs); i++ {
if less(vs[i], v) {
v = vs[i]
}
func FastLogisticFunc(x0, L, k float64) func(x float64) float64 {
return func(x float64) float64 {
return L / 2 * (1 + k*(x-x0)/(1+math.Abs(k*(x-x0))))
}
return v
}
22 changes: 22 additions & 0 deletions utils/options/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package options

func Apply[T any, F ~func(T)](o T, opts []F) T {
for _, opt := range opts {
opt(o)
}
return o
}
21 changes: 21 additions & 0 deletions utils/proto.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package utils

import "google.golang.org/protobuf/proto"

func CloneProto[T proto.Message](m T) T {
return proto.Clone(m).(T)
}

0 comments on commit 89a44c6

Please sign in to comment.