diff --git a/controller/events/dispatcher.go b/controller/events/dispatcher.go index 748f175ff..c0c0905c2 100644 --- a/controller/events/dispatcher.go +++ b/controller/events/dispatcher.go @@ -47,6 +47,7 @@ func NewDispatcher(closeNotify <-chan struct{}) *Dispatcher { result := &Dispatcher{ closeNotify: closeNotify, entityChangeEventsDispatcher: entityChangeEventDispatcher{ + closeNotify: closeNotify, notifyCh: make(chan struct{}, 1), globalMetadata: map[string]any{}, }, diff --git a/controller/events/dispatcher_entity_change.go b/controller/events/dispatcher_entity_change.go index 5ef20f978..54e35874b 100644 --- a/controller/events/dispatcher_entity_change.go +++ b/controller/events/dispatcher_entity_change.go @@ -163,6 +163,7 @@ func bytesToTxId(b []byte) uint64 { type entityChangeEventDispatcher struct { network *network.Network dispatcher *Dispatcher + closeNotify <-chan struct{} notifyCh chan struct{} globalMetadata map[string]any } @@ -298,12 +299,18 @@ func (self *entityChangeEventDispatcher) notifyFlush() { func (self *entityChangeEventDispatcher) flushLoop() { for { // wait to be notified of an event - <-self.notifyCh + select { + case <-self.closeNotify: + return + case <-self.notifyCh: + } // wait until we've not gotten an event for 5 seconds before cleaning up flushed := false for !flushed { select { + case <-self.closeNotify: + return case <-self.notifyCh: case <-time.After(5 * time.Second): pfxlog.Logger().Debug("cleaning up entity change events")