Skip to content

Commit

Permalink
The Nats-TTL header is now duration in seconds (previously TS in na…
Browse files Browse the repository at this point in the history
…noseconds)

Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
neilalexander committed Dec 18, 2024
1 parent 3442dc5 commit ac88b0d
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 18 deletions.
9 changes: 6 additions & 3 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1469,8 +1469,9 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) {
mb.msgs++
mb.bytes += uint64(rl)
if ttl > 0 {
expires := time.Duration(ts) + (time.Second * time.Duration(ttl))
mb.fs.ttls.Add(seq, int64(expires))
mb.ttls++
mb.fs.ttls.Add(seq, ttl)
}
}

Expand Down Expand Up @@ -1881,7 +1882,8 @@ func (fs *fileStore) recoverTTLState() error {
continue
}
if ttl := getMessageTTL(msg.hdr); ttl > 0 {
fs.ttls.Add(seq, ttl)
expires := time.Duration(msg.ts) + (time.Second * time.Duration(ttl))
fs.ttls.Add(seq, int64(expires))
if seq > fs.ttlseq {
fs.ttlseq = seq
}
Expand Down Expand Up @@ -3845,7 +3847,8 @@ func (fs *fileStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts, t

// Per-message TTL.
if fs.ttls != nil && ttl > 0 {
fs.ttls.Add(seq, ttl)
expires := time.Duration(ts) + (time.Second * time.Duration(ttl))
fs.ttls.Add(seq, int64(expires))
fs.lmb.ttls++
if seq > fs.ttlseq {
fs.ttlseq = seq
Expand Down
13 changes: 7 additions & 6 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8304,7 +8304,8 @@ func TestFileStoreMessageTTL(t *testing.T) {
require_NoError(t, err)
defer fs.Stop()

ttl := time.Now().Add(time.Second / 2).UnixNano()
ttl := int64(1) // 1 second

for i := 1; i <= 10; i++ {
_, _, err = fs.StoreMsg("test", nil, nil, ttl)
require_NoError(t, err)
Expand All @@ -8316,7 +8317,7 @@ func TestFileStoreMessageTTL(t *testing.T) {
require_Equal(t, ss.LastSeq, 10)
require_Equal(t, ss.Msgs, 10)

time.Sleep(time.Second)
time.Sleep(time.Second * 2)

fs.FastState(&ss)
require_Equal(t, ss.FirstSeq, 11)
Expand All @@ -8334,7 +8335,7 @@ func TestFileStoreMessageTTLRestart(t *testing.T) {
require_NoError(t, err)
defer fs.Stop()

ttl := time.Now().Add(time.Second / 2).UnixNano()
ttl := int64(1) // 1 second

for i := 1; i <= 10; i++ {
_, _, err = fs.StoreMsg("test", nil, nil, ttl)
Expand All @@ -8361,7 +8362,7 @@ func TestFileStoreMessageTTLRestart(t *testing.T) {
require_Equal(t, ss.LastSeq, 10)
require_Equal(t, ss.Msgs, 10)

time.Sleep(time.Second)
time.Sleep(time.Second * 2)

fs.FastState(&ss)
require_Equal(t, ss.FirstSeq, 11)
Expand All @@ -8383,7 +8384,7 @@ func TestFileStoreMessageTTLRecovered(t *testing.T) {
require_NoError(t, err)
defer fs.Stop()

ttl := time.Now().Add(time.Second / 2).UnixNano()
ttl := int64(1) // 1 second

for i := 1; i <= 10; i++ {
// When the timed hash wheel state is deleted, the only way we can recover
Expand Down Expand Up @@ -8419,7 +8420,7 @@ func TestFileStoreMessageTTLRecovered(t *testing.T) {
require_Equal(t, ss.LastSeq, 10)
require_Equal(t, ss.Msgs, 10)

time.Sleep(time.Second)
time.Sleep(time.Second * 2)

fs.FastState(&ss)
require_Equal(t, ss.FirstSeq, 11)
Expand Down
15 changes: 6 additions & 9 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24756,9 +24756,8 @@ func TestJetStreamMessageTTL(t *testing.T) {
Header: nats.Header{},
}

ttl := time.Now().Add(time.Second / 2).UnixNano()
for i := 1; i <= 10; i++ {
msg.Header.Set("Nats-TTL", fmt.Sprintf("%d", ttl))
msg.Header.Set("Nats-TTL", "1")
_, err := js.PublishMsg(msg)
require_NoError(t, err)
}
Expand All @@ -24769,7 +24768,7 @@ func TestJetStreamMessageTTL(t *testing.T) {
require_Equal(t, si.State.FirstSeq, 1)
require_Equal(t, si.State.LastSeq, 10)

time.Sleep(time.Second)
time.Sleep(time.Second * 2)

si, err = js.StreamInfo("TEST")
require_NoError(t, err)
Expand All @@ -24796,9 +24795,8 @@ func TestJetStreamMessageTTLRestart(t *testing.T) {
Header: nats.Header{},
}

ttl := time.Now().Add(time.Second / 2).UnixNano()
for i := 1; i <= 10; i++ {
msg.Header.Set("Nats-TTL", fmt.Sprintf("%d", ttl))
msg.Header.Set("Nats-TTL", "1")
_, err := js.PublishMsg(msg)
require_NoError(t, err)
}
Expand All @@ -24824,7 +24822,7 @@ func TestJetStreamMessageTTLRestart(t *testing.T) {
require_Equal(t, si.State.FirstSeq, 1)
require_Equal(t, si.State.LastSeq, 10)

time.Sleep(time.Second)
time.Sleep(time.Second * 2)

si, err = js.StreamInfo("TEST")
require_NoError(t, err)
Expand All @@ -24851,9 +24849,8 @@ func TestJetStreamMessageTTLRecovered(t *testing.T) {
Header: nats.Header{},
}

ttl := time.Now().Add(time.Second / 2).UnixNano()
for i := 1; i <= 10; i++ {
msg.Header.Set("Nats-TTL", fmt.Sprintf("%d", ttl))
msg.Header.Set("Nats-TTL", "1")
_, err := js.PublishMsg(msg)
require_NoError(t, err)
}
Expand Down Expand Up @@ -24882,7 +24879,7 @@ func TestJetStreamMessageTTLRecovered(t *testing.T) {
require_Equal(t, si.State.FirstSeq, 1)
require_Equal(t, si.State.LastSeq, 10)

time.Sleep(time.Second)
time.Sleep(time.Second * 2)

si, err = js.StreamInfo("TEST")
require_NoError(t, err)
Expand Down

0 comments on commit ac88b0d

Please sign in to comment.