From ac88b0db83f426449ac4f2042138cf7cd6531e0e Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Wed, 18 Dec 2024 13:32:31 +0000 Subject: [PATCH] The `Nats-TTL` header is now duration in seconds (previously TS in nanoseconds) Signed-off-by: Neil Twigg --- server/filestore.go | 9 ++++++--- server/filestore_test.go | 13 +++++++------ server/jetstream_test.go | 15 ++++++--------- 3 files changed, 19 insertions(+), 18 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 06e6b42c47..86dbb76d49 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -1469,8 +1469,9 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) { mb.msgs++ mb.bytes += uint64(rl) if ttl > 0 { + expires := time.Duration(ts) + (time.Second * time.Duration(ttl)) + mb.fs.ttls.Add(seq, int64(expires)) mb.ttls++ - mb.fs.ttls.Add(seq, ttl) } } @@ -1881,7 +1882,8 @@ func (fs *fileStore) recoverTTLState() error { continue } if ttl := getMessageTTL(msg.hdr); ttl > 0 { - fs.ttls.Add(seq, ttl) + expires := time.Duration(msg.ts) + (time.Second * time.Duration(ttl)) + fs.ttls.Add(seq, int64(expires)) if seq > fs.ttlseq { fs.ttlseq = seq } @@ -3845,7 +3847,8 @@ func (fs *fileStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts, t // Per-message TTL. if fs.ttls != nil && ttl > 0 { - fs.ttls.Add(seq, ttl) + expires := time.Duration(ts) + (time.Second * time.Duration(ttl)) + fs.ttls.Add(seq, int64(expires)) fs.lmb.ttls++ if seq > fs.ttlseq { fs.ttlseq = seq diff --git a/server/filestore_test.go b/server/filestore_test.go index db327e4881..ed1ca76904 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -8304,7 +8304,8 @@ func TestFileStoreMessageTTL(t *testing.T) { require_NoError(t, err) defer fs.Stop() - ttl := time.Now().Add(time.Second / 2).UnixNano() + ttl := int64(1) // 1 second + for i := 1; i <= 10; i++ { _, _, err = fs.StoreMsg("test", nil, nil, ttl) require_NoError(t, err) @@ -8316,7 +8317,7 @@ func TestFileStoreMessageTTL(t *testing.T) { require_Equal(t, ss.LastSeq, 10) require_Equal(t, ss.Msgs, 10) - time.Sleep(time.Second) + time.Sleep(time.Second * 2) fs.FastState(&ss) require_Equal(t, ss.FirstSeq, 11) @@ -8334,7 +8335,7 @@ func TestFileStoreMessageTTLRestart(t *testing.T) { require_NoError(t, err) defer fs.Stop() - ttl := time.Now().Add(time.Second / 2).UnixNano() + ttl := int64(1) // 1 second for i := 1; i <= 10; i++ { _, _, err = fs.StoreMsg("test", nil, nil, ttl) @@ -8361,7 +8362,7 @@ func TestFileStoreMessageTTLRestart(t *testing.T) { require_Equal(t, ss.LastSeq, 10) require_Equal(t, ss.Msgs, 10) - time.Sleep(time.Second) + time.Sleep(time.Second * 2) fs.FastState(&ss) require_Equal(t, ss.FirstSeq, 11) @@ -8383,7 +8384,7 @@ func TestFileStoreMessageTTLRecovered(t *testing.T) { require_NoError(t, err) defer fs.Stop() - ttl := time.Now().Add(time.Second / 2).UnixNano() + ttl := int64(1) // 1 second for i := 1; i <= 10; i++ { // When the timed hash wheel state is deleted, the only way we can recover @@ -8419,7 +8420,7 @@ func TestFileStoreMessageTTLRecovered(t *testing.T) { require_Equal(t, ss.LastSeq, 10) require_Equal(t, ss.Msgs, 10) - time.Sleep(time.Second) + time.Sleep(time.Second * 2) fs.FastState(&ss) require_Equal(t, ss.FirstSeq, 11) diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 0fca438710..403a8c6d91 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -24756,9 +24756,8 @@ func TestJetStreamMessageTTL(t *testing.T) { Header: nats.Header{}, } - ttl := time.Now().Add(time.Second / 2).UnixNano() for i := 1; i <= 10; i++ { - msg.Header.Set("Nats-TTL", fmt.Sprintf("%d", ttl)) + msg.Header.Set("Nats-TTL", "1") _, err := js.PublishMsg(msg) require_NoError(t, err) } @@ -24769,7 +24768,7 @@ func TestJetStreamMessageTTL(t *testing.T) { require_Equal(t, si.State.FirstSeq, 1) require_Equal(t, si.State.LastSeq, 10) - time.Sleep(time.Second) + time.Sleep(time.Second * 2) si, err = js.StreamInfo("TEST") require_NoError(t, err) @@ -24796,9 +24795,8 @@ func TestJetStreamMessageTTLRestart(t *testing.T) { Header: nats.Header{}, } - ttl := time.Now().Add(time.Second / 2).UnixNano() for i := 1; i <= 10; i++ { - msg.Header.Set("Nats-TTL", fmt.Sprintf("%d", ttl)) + msg.Header.Set("Nats-TTL", "1") _, err := js.PublishMsg(msg) require_NoError(t, err) } @@ -24824,7 +24822,7 @@ func TestJetStreamMessageTTLRestart(t *testing.T) { require_Equal(t, si.State.FirstSeq, 1) require_Equal(t, si.State.LastSeq, 10) - time.Sleep(time.Second) + time.Sleep(time.Second * 2) si, err = js.StreamInfo("TEST") require_NoError(t, err) @@ -24851,9 +24849,8 @@ func TestJetStreamMessageTTLRecovered(t *testing.T) { Header: nats.Header{}, } - ttl := time.Now().Add(time.Second / 2).UnixNano() for i := 1; i <= 10; i++ { - msg.Header.Set("Nats-TTL", fmt.Sprintf("%d", ttl)) + msg.Header.Set("Nats-TTL", "1") _, err := js.PublishMsg(msg) require_NoError(t, err) } @@ -24882,7 +24879,7 @@ func TestJetStreamMessageTTLRecovered(t *testing.T) { require_Equal(t, si.State.FirstSeq, 1) require_Equal(t, si.State.LastSeq, 10) - time.Sleep(time.Second) + time.Sleep(time.Second * 2) si, err = js.StreamInfo("TEST") require_NoError(t, err)