diff --git a/engine.go b/engine.go index 4b672a6..632707d 100644 --- a/engine.go +++ b/engine.go @@ -436,10 +436,13 @@ func (e *Engine) Unsubscribe(o chan Reply, id int64) { return } - newUnObs := []chan<- Reply{} - for _, existing := range e.unObservers { + unObservers := e.unObservers + // in-place removing observer + newUnObs := unObservers[:0] + for i, existing := range unObservers { + unObservers[i] = nil // avoid memory leak of channel if existing != o { - newUnObs = append(newUnObs, o) + newUnObs = append(newUnObs, existing) } } e.unObservers = newUnObs @@ -460,10 +463,12 @@ func (e *Engine) UnsubscribeAll(o chan Reply) { } }() e.sendCommand(func() { - newUnObs := []chan<- Reply{} - for _, existing := range e.allObservers { + allObservers := e.allObservers + newUnObs := allObservers[:0] + for i, existing := range allObservers { + allObservers[i] = nil // avoid memory leak of channel if existing != o { - newUnObs = append(newUnObs, o) + newUnObs = append(newUnObs, existing) } } e.allObservers = newUnObs diff --git a/engine_test.go b/engine_test.go index 14e6ace..b796557 100644 --- a/engine_test.go +++ b/engine_test.go @@ -133,6 +133,31 @@ func TestConnect(t *testing.T) { } } +func TestUnsubscribeAllAndUnmatched(t *testing.T) { + engine := NewTestEngine(t) + defer engine.ConditionalStop(t) + dummy := make(chan Reply) + rc := make(chan Reply) + engine.SubscribeAll(dummy) + engine.SubscribeAll(rc) + engine.UnsubscribeAll(rc) + for _, v := range engine.allObservers { + if v == rc { + t.Log("rc should be unsubscribed from allObservers") + t.Fail() + } + } + engine.Subscribe(dummy, UnmatchedReplyID) + engine.Subscribe(rc, UnmatchedReplyID) + engine.Unsubscribe(rc, UnmatchedReplyID) + for _, v := range engine.unObservers { + if v == rc { + t.Log("rc should be unsubscribed from unObservers") + t.Fail() + } + } +} + func logreply(t *testing.T, reply Reply, err error) { if reply == nil { t.Logf("received reply nil")