diff --git a/utils/configobserver.go b/utils/configobserver.go index f8f42942..c05c0e75 100644 --- a/utils/configobserver.go +++ b/utils/configobserver.go @@ -15,15 +15,15 @@ package utils import ( - "container/list" "fmt" "os" - "sync" "github.com/fsnotify/fsnotify" + "go.uber.org/atomic" "gopkg.in/yaml.v3" "github.com/livekit/protocol/logger" + "github.com/livekit/protocol/utils/events" ) type ConfigBuilder[T any] interface { @@ -35,16 +35,16 @@ type ConfigDefaulter[T any] interface { } type ConfigObserver[T any] struct { - builder ConfigBuilder[T] - watcher *fsnotify.Watcher - mu sync.Mutex - cbs list.List - conf *T + builder ConfigBuilder[T] + watcher *fsnotify.Watcher + observers *events.ObserverList[*T] + conf atomic.Pointer[T] } func NewConfigObserver[T any](path string, builder ConfigBuilder[T]) (*ConfigObserver[T], *T, error) { c := &ConfigObserver[T]{ - builder: builder, + builder: builder, + observers: events.NewObserverList[*T](events.WithBlocking()), } conf, err := c.load(path) @@ -74,32 +74,18 @@ func (c *ConfigObserver[T]) Close() { } func (c *ConfigObserver[T]) EmitConfigUpdate(conf *T) { - c.mu.Lock() - defer c.mu.Unlock() - for e := c.cbs.Front(); e != nil; e = e.Next() { - e.Value.(func(*T))(conf) - } + c.observers.Emit(conf) } func (c *ConfigObserver[T]) Observe(cb func(*T)) func() { if c == nil { return func() {} } - c.mu.Lock() - e := c.cbs.PushBack(cb) - c.mu.Unlock() - - return func() { - c.mu.Lock() - c.cbs.Remove(e) - c.mu.Unlock() - } + return c.observers.On(cb) } func (c *ConfigObserver[T]) Load() *T { - c.mu.Lock() - defer c.mu.Unlock() - return c.conf + return c.conf.Load() } func (c *ConfigObserver[T]) watch() { @@ -165,9 +151,7 @@ func (c *ConfigObserver[T]) load(path string) (*T, error) { d.InitDefaults(conf) } - c.mu.Lock() - c.conf = conf - c.mu.Unlock() + c.conf.Store(conf) return conf, err }