diff --git a/go.mod b/go.mod index 55b6e570..4cb53fe7 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.21 require ( github.com/eapache/channels v1.1.0 github.com/frostbyte73/core v0.0.10 + github.com/fsnotify/fsnotify v1.7.0 github.com/gammazero/deque v0.2.1 github.com/go-jose/go-jose/v3 v3.0.3 github.com/go-logr/logr v1.4.1 diff --git a/go.sum b/go.sum index 3fd42fa7..cb220639 100644 --- a/go.sum +++ b/go.sum @@ -19,6 +19,8 @@ github.com/frostbyte73/core v0.0.10 h1:D4DQXdPb8ICayz0n75rs4UYTXrUSdxzUfeleuNJOR github.com/frostbyte73/core v0.0.10/go.mod h1:XsOGqrqe/VEV7+8vJ+3a8qnCIXNbKsoEiu/czs7nrcU= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= +github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/gammazero/deque v0.2.1 h1:qSdsbG6pgp6nL7A0+K/B7s12mcCY/5l5SIUpMOl+dC0= github.com/gammazero/deque v0.2.1/go.mod h1:LFroj8x4cMYCukHJDbxFCkT+r9AndaJnFMuZDV34tuU= github.com/go-jose/go-jose/v3 v3.0.3 h1:fFKWeig/irsp7XD2zBxvnmA/XaRWp5V3CBsZXJF7G7k= diff --git a/utils/configobserver.go b/utils/configobserver.go new file mode 100644 index 00000000..cd5856a4 --- /dev/null +++ b/utils/configobserver.go @@ -0,0 +1,140 @@ +package utils + +import ( + "container/list" + "os" + "sync" + + "github.com/fsnotify/fsnotify" + "gopkg.in/yaml.v3" + + "github.com/livekit/protocol/logger" +) + +type ConfigBuilder[T any] interface { + New() (*T, error) +} + +type ConfigDefaulter[T any] interface { + InitDefaults(*T) +} + +type ConfigObserver[T any] struct { + builder ConfigBuilder[T] + watcher *fsnotify.Watcher + mu sync.Mutex + cbs list.List +} + +func NewConfigObserver[T any](path string, builder ConfigBuilder[T]) (*ConfigObserver[T], *T, error) { + c := &ConfigObserver[T]{ + builder: builder, + } + + config, err := c.load(path) + if err != nil { + return nil, nil, err + } + + if path != "" { + c.watcher, err = fsnotify.NewWatcher() + if err != nil { + return nil, nil, err + } + if err := c.watcher.Add(path); err != nil { + c.watcher.Close() + return nil, nil, err + } + go c.watch() + } + + return c, config, nil +} + +func (c *ConfigObserver[T]) Close() { + if c != nil && c.watcher != nil { + c.watcher.Close() + } +} + +func (c *ConfigObserver[T]) Observe(cb func(*T)) func() { + if c == nil { + return func() {} + } + c.mu.Lock() + e := c.cbs.PushBack(cb) + c.mu.Unlock() + + return func() { + c.mu.Lock() + c.cbs.Remove(e) + c.mu.Unlock() + } +} + +func (c *ConfigObserver[T]) watch() { + for { + select { + case event, ok := <-c.watcher.Events: + if !ok { + return + } + if event.Has(fsnotify.Remove) { + if err := c.watcher.Add(event.Name); err != nil { + logger.Errorw("unable to rewatch config file", err, "file", event.Name) + } + } + if event.Has(fsnotify.Write | fsnotify.Remove) { + if err := c.reload(event.Name); err != nil { + logger.Errorw("unable to update config file", err, "file", event.Name) + } else { + logger.Infow("config file has been updated", "file", event.Name) + } + } + case err, ok := <-c.watcher.Errors: + if !ok { + return + } + logger.Errorw("config file watcher error", err) + } + } +} + +func (c *ConfigObserver[T]) reload(path string) error { + conf, err := c.load(path) + if err != nil { + return err + } + + c.mu.Lock() + defer c.mu.Unlock() + for e := c.cbs.Front(); e != nil; e = e.Next() { + go e.Value.(func(*T))(conf) + } + return nil +} + +func (c *ConfigObserver[T]) load(path string) (*T, error) { + conf, err := c.builder.New() + if err != nil { + return nil, err + } + + if path != "" { + f, err := os.OpenFile(path, os.O_RDONLY, 0644) + if err != nil { + return nil, err + } + defer f.Close() + + if err := yaml.NewDecoder(f).Decode(conf); err != nil { + return nil, err + } + } + + if d, ok := c.builder.(ConfigDefaulter[T]); ok { + d.InitDefaults(conf) + } + + return conf, err +}