diff --git a/server/server.go b/server/server.go index fc945cdb..2496fef4 100644 --- a/server/server.go +++ b/server/server.go @@ -3693,8 +3693,13 @@ func (s *StanServer) performAckExpirationRedelivery(sub *subState, isStartup boo sub.Unlock() return } - // Sort our messages outstanding from acksPending, grab some state and unlock. sub.Lock() + // Subscriber could have been closed + if sub.ackTimer == nil { + sub.Unlock() + return + } + // Sort our messages outstanding from acksPending, grab some state and unlock. sortedPendingMsgs := sub.makeSortedPendingMsgs() if len(sortedPendingMsgs) == 0 { sub.clearAckTimer() diff --git a/server/server_redelivery_test.go b/server/server_redelivery_test.go index d715575c..cb52e196 100644 --- a/server/server_redelivery_test.go +++ b/server/server_redelivery_test.go @@ -16,6 +16,7 @@ package server import ( "encoding/json" "fmt" + "math/rand" "net" "runtime" "sync" @@ -1714,3 +1715,35 @@ func TestRedeliveryRaceWithAck(t *testing.T) { } } + +func TestNoPanicOnSubCloseWhileOnRedelivery(t *testing.T) { + s := runServer(t, clusterName) + defer s.Shutdown() + + sc := NewDefaultConnection(t) + defer sc.Close() + + if err := sc.Publish("foo", []byte("msg")); err != nil { + t.Fatalf("Error on publish: %v", err) + } + + for i := 0; i < 100; i++ { + sub, err := sc.Subscribe("foo", func(_ *stan.Msg) {}, + stan.AckWait(ackWaitInMs(5)), + stan.SetManualAckMode(), + stan.DeliverAllAvailable()) + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + // Artificially pretend that the client had failed hearbeat + srvSub := s.clients.getSubs(clientName)[0] + srvSub.Lock() + srvSub.hasFailedHB = true + srvSub.Unlock() + + time.Sleep(time.Duration(rand.Intn(15)) * time.Millisecond) + sub.Close() + + waitForNumSubs(t, s, clientName, 0) + } +}