From f850ffcea024e6cb7c940e8e88b02407ee968bbf Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Thu, 19 Dec 2024 11:23:38 +0100 Subject: [PATCH] Recover TTLs without index.db & recover if only one message Signed-off-by: Maurice van Veen --- server/filestore.go | 9 +++++-- server/filestore_test.go | 55 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 62 insertions(+), 2 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 86dbb76d49..7601f0faad 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -479,6 +479,11 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim fs.dirty++ } + err = fs.recoverTTLState() + if err != nil && !os.IsNotExist(err) { + fs.warn("Recovering TTL state from index errored: %v", err) + } + // Also make sure we get rid of old idx and fss files on return. // Do this in separate go routine vs inline and at end of processing. defer func() { @@ -1815,7 +1820,7 @@ func (fs *fileStore) recoverFullState() (rerr error) { return errCorruptState } - return fs.recoverTTLState() + return nil } func (fs *fileStore) recoverTTLState() error { @@ -1844,7 +1849,7 @@ func (fs *fileStore) recoverTTLState() error { } defer fs.resetAgeChk(0) - if fs.ttlseq < fs.state.LastSeq { + if fs.ttlseq <= fs.state.LastSeq { fs.warn("TTL state is outdated; attempting to recover using linear scan (seq %d to %d)", fs.ttlseq, fs.state.LastSeq) var sm StoreMsg mb := fs.selectMsgBlock(fs.ttlseq) diff --git a/server/filestore_test.go b/server/filestore_test.go index ed1ca76904..1e3cd47a78 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -8428,3 +8428,58 @@ func TestFileStoreMessageTTLRecovered(t *testing.T) { require_Equal(t, ss.Msgs, 0) }) } + +func TestFileStoreMessageTTLRecoveredSingleMessageWithoutStreamState(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + dir := t.TempDir() + + t.Run("BeforeRestart", func(t *testing.T) { + fs, err := newFileStore( + FileStoreConfig{StoreDir: dir, srv: s}, + StreamConfig{Name: "zzz", Subjects: []string{"test"}, Storage: FileStorage}) + require_NoError(t, err) + defer fs.Stop() + + ttl := int64(1) // 1 second + hdr := fmt.Appendf(nil, "NATS/1.0\r\n%s: %d\r\n", JSMessageTTL, ttl) + _, _, err = fs.StoreMsg("test", hdr, nil, ttl) + require_NoError(t, err) + + var ss StreamState + fs.FastState(&ss) + require_Equal(t, ss.FirstSeq, 1) + require_Equal(t, ss.LastSeq, 1) + require_Equal(t, ss.Msgs, 1) + }) + + t.Run("AfterRestart", func(t *testing.T) { + // Delete the stream state file so that we need to rebuild. + fn := filepath.Join(dir, msgDir, streamStreamStateFile) + require_NoError(t, os.RemoveAll(fn)) + // Delete the timed hash wheel state so that we are forced to do a linear scan + // of message blocks containing TTL'd messages. + fn = filepath.Join(dir, msgDir, ttlStreamStateFile) + require_NoError(t, os.RemoveAll(fn)) + + fs, err := newFileStore( + FileStoreConfig{StoreDir: dir, srv: s}, + StreamConfig{Name: "zzz", Subjects: []string{"test"}, Storage: FileStorage}) + require_NoError(t, err) + defer fs.Stop() + + var ss StreamState + fs.FastState(&ss) + require_Equal(t, ss.FirstSeq, 1) + require_Equal(t, ss.LastSeq, 1) + require_Equal(t, ss.Msgs, 1) + + time.Sleep(time.Second * 2) + + fs.FastState(&ss) + require_Equal(t, ss.FirstSeq, 2) + require_Equal(t, ss.LastSeq, 1) + require_Equal(t, ss.Msgs, 0) + }) +}