diff --git a/stores/sqlstore.go b/stores/sqlstore.go index 24fd67ff..23ff5254 100644 --- a/stores/sqlstore.go +++ b/stores/sqlstore.go @@ -909,12 +909,6 @@ func (s *SQLStore) Recover() (*RecoveredState, error) { msgStore.last = last msgStore.totalCount = totalCount msgStore.totalBytes = totalBytes - // Since messages may have been removed due to limits, update first/last - // based on known max sequence. - if maxseq > msgStore.last { - msgStore.first = maxseq + 1 - msgStore.last = maxseq - } subStore := s.newSQLSubStore(channelID, &channelLimits.SubStoreLimits) // Prevent scheduling to flusher while we are recovering @@ -988,11 +982,24 @@ func (s *SQLStore) Recover() (*RecoveredState, error) { } // Add to the recovered subscriptions + if sub.LastSent > maxseq { + maxseq = sub.LastSent + } subscriptions = append(subscriptions, &RecoveredSubscription{Sub: sub, Pending: pendingAcks}) + } else if lastSent > maxseq { + maxseq = lastSent } } subRows.Close() + // Since messages may have been removed due to limits, or having a higher + // last_sent in some of the subscription, update first/last based on known + // max sequence. + if maxseq > msgStore.last { + msgStore.first = maxseq + 1 + msgStore.last = maxseq + } + if !s.opts.NoCaching { // Clear but also allow scheduling now that the recovery is complete. subStore.cache.needsFlush = false diff --git a/stores/sqlstore_test.go b/stores/sqlstore_test.go index f665abca..9546d3f1 100644 --- a/stores/sqlstore_test.go +++ b/stores/sqlstore_test.go @@ -2212,3 +2212,108 @@ func TestSQLBulkInsertLimit(t *testing.T) { } } } + +func TestSQLStoreMaxSeqWithExpiredMsgs(t *testing.T) { + if !doSQL { + t.SkipNow() + } + + cleanupSQLDatastore(t) + defer cleanupSQLDatastore(t) + + // Create store with caching enabled + limits := testDefaultStoreLimits + limits.MaxAge = 250 * time.Millisecond + s, err := NewSQLStore(testLogger, testSQLDriver, testSQLSource, &limits, SQLNoCaching(false)) + if err != nil { + t.Fatalf("Error creating store: %v", err) + } + defer s.Close() + + info := testDefaultServerInfo + if err := s.Init(&info); err != nil { + t.Fatalf("Error on init: %v", err) + } + + cs := storeCreateChannel(t, s, "foo") + + sub := spb.SubState{ + ClientID: "me", + AckWaitInSecs: 30, + IsDurable: true, + MaxInFlight: 1024, + QGroup: "dur:queue", + Inbox: "my.inbox", + AckInbox: "ack.inbox", + } + if err := cs.Subs.CreateSub(&sub); err != nil { + t.Fatalf("Error creating sub: %v", err) + } + + for seq := uint64(1); seq <= 100; seq++ { + msg := &pb.MsgProto{ + Sequence: seq, + Subject: "foo", + Data: []byte(fmt.Sprintf("%v", seq)), + Timestamp: time.Now().UnixNano(), + } + if _, err := cs.Msgs.Store(msg); err != nil { + t.Fatalf("Error storing message: %v", err) + } + } + if err := cs.Msgs.Flush(); err != nil { + t.Fatalf("Error on flush: %v", err) + } + + for seq := uint64(1); seq <= 100; seq++ { + cs.Subs.AddSeqPending(sub.ID, seq) + } + for seq := uint64(1); seq <= 98; seq++ { + cs.Subs.AckSeqPending(sub.ID, seq) + } + sub.IsClosed = true + cs.Subs.UpdateSub(&sub) + cs.Subs.Flush() + + s.Close() + + // Wait for more than max age + time.Sleep(500 * time.Millisecond) + + for i := 0; i < 2; i++ { + s, err = NewSQLStore(testLogger, testSQLDriver, testSQLSource, &limits, SQLNoCaching(false)) + if err != nil { + t.Fatalf("Error creating store: %v", err) + } + defer s.Close() + + // Recover state + rs, err := s.Recover() + if err != nil { + t.Fatalf("Error on recovery: %v", err) + } + c, ok := rs.Channels["foo"] + if !ok { + t.Fatal("Did not recover channel foo") + } + cs = c.Channel + if i > 0 { + first, last, err := cs.Msgs.FirstAndLastSequence() + if err != nil { + t.Fatalf("Error getting first/last seq: %v", err) + } + if first != 101 && last != 100 { + t.Fatalf("Unexpected first/last sequence: %v/%v", first, last) + } + + if l := len(c.Subscriptions); l != 1 { + t.Fatalf("Expected 1 sub, got %v", l) + } + ts := c.Subscriptions[0] + if ts.Sub.LastSent != 100 { + t.Fatalf("Unexpected sub's LastSent: %v", ts.Sub.LastSent) + } + } + s.Close() + } +}