Skip to content

Commit

Permalink
Recover TTLs without index.db & recover if only one message
Browse files Browse the repository at this point in the history
Signed-off-by: Maurice van Veen <[email protected]>
  • Loading branch information
MauriceVanVeen committed Dec 19, 2024
1 parent ac88b0d commit f850ffc
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 2 deletions.
9 changes: 7 additions & 2 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,11 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
fs.dirty++
}

err = fs.recoverTTLState()
if err != nil && !os.IsNotExist(err) {
fs.warn("Recovering TTL state from index errored: %v", err)
}

// Also make sure we get rid of old idx and fss files on return.
// Do this in separate go routine vs inline and at end of processing.
defer func() {
Expand Down Expand Up @@ -1815,7 +1820,7 @@ func (fs *fileStore) recoverFullState() (rerr error) {
return errCorruptState
}

return fs.recoverTTLState()
return nil
}

func (fs *fileStore) recoverTTLState() error {
Expand Down Expand Up @@ -1844,7 +1849,7 @@ func (fs *fileStore) recoverTTLState() error {
}

defer fs.resetAgeChk(0)
if fs.ttlseq < fs.state.LastSeq {
if fs.ttlseq <= fs.state.LastSeq {
fs.warn("TTL state is outdated; attempting to recover using linear scan (seq %d to %d)", fs.ttlseq, fs.state.LastSeq)
var sm StoreMsg
mb := fs.selectMsgBlock(fs.ttlseq)
Expand Down
55 changes: 55 additions & 0 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8428,3 +8428,58 @@ func TestFileStoreMessageTTLRecovered(t *testing.T) {
require_Equal(t, ss.Msgs, 0)
})
}

func TestFileStoreMessageTTLRecoveredSingleMessageWithoutStreamState(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

dir := t.TempDir()

t.Run("BeforeRestart", func(t *testing.T) {
fs, err := newFileStore(
FileStoreConfig{StoreDir: dir, srv: s},
StreamConfig{Name: "zzz", Subjects: []string{"test"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

ttl := int64(1) // 1 second
hdr := fmt.Appendf(nil, "NATS/1.0\r\n%s: %d\r\n", JSMessageTTL, ttl)
_, _, err = fs.StoreMsg("test", hdr, nil, ttl)
require_NoError(t, err)

var ss StreamState
fs.FastState(&ss)
require_Equal(t, ss.FirstSeq, 1)
require_Equal(t, ss.LastSeq, 1)
require_Equal(t, ss.Msgs, 1)
})

t.Run("AfterRestart", func(t *testing.T) {
// Delete the stream state file so that we need to rebuild.
fn := filepath.Join(dir, msgDir, streamStreamStateFile)
require_NoError(t, os.RemoveAll(fn))
// Delete the timed hash wheel state so that we are forced to do a linear scan
// of message blocks containing TTL'd messages.
fn = filepath.Join(dir, msgDir, ttlStreamStateFile)
require_NoError(t, os.RemoveAll(fn))

fs, err := newFileStore(
FileStoreConfig{StoreDir: dir, srv: s},
StreamConfig{Name: "zzz", Subjects: []string{"test"}, Storage: FileStorage})
require_NoError(t, err)
defer fs.Stop()

var ss StreamState
fs.FastState(&ss)
require_Equal(t, ss.FirstSeq, 1)
require_Equal(t, ss.LastSeq, 1)
require_Equal(t, ss.Msgs, 1)

time.Sleep(time.Second * 2)

fs.FastState(&ss)
require_Equal(t, ss.FirstSeq, 2)
require_Equal(t, ss.LastSeq, 1)
require_Equal(t, ss.Msgs, 0)
})
}

0 comments on commit f850ffc

Please sign in to comment.