Skip to content

Commit

Permalink
Remove xbit as not backward-compatible, scan headers instead
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
neilalexander committed Dec 19, 2024
1 parent f850ffc commit 4b5b21e
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 28 deletions.
39 changes: 12 additions & 27 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1394,12 +1394,11 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) {

hasHeaders := rl&hbit != 0
var ttl int64
if rl&xbit != 0 {
if len(hdr) > 0 {
ttl = getMessageTTL(hdr)
}
// Clear any headers bit that could be set.
rl &^= hbit
rl &^= xbit
dlen := int(rl) - msgHdrSize
// Do some quick sanity checks here.
if dlen < 0 || int(slen) > (dlen-recordHashSize) || dlen > int(rl) || index+rl > lbuf || rl > rlBadThresh {
Expand Down Expand Up @@ -1860,7 +1859,7 @@ func (fs *fileStore) recoverTTLState() error {
for seq := fs.ttlseq; seq <= fs.state.LastSeq; seq++ {
retry:
if mb.ttls == 0 {
// None of the messages in the block are marked with an xbit so don't
// None of the messages in the block have message TTLs so don't
// bother doing anything further with this block, skip to the end.
seq = atomic.LoadUint64(&mb.last.seq) + 1
}
Expand Down Expand Up @@ -3783,7 +3782,7 @@ func (fs *fileStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts, t
// Write msg record.
// Add expiry bit to sequence if needed. This is so that if we need to
// rebuild, we know which messages to look at more quickly.
n, err := fs.writeMsgRecord(seq, ts, subj, hdr, msg, ttl)
n, err := fs.writeMsgRecord(seq, ts, subj, hdr, msg)
if err != nil {
return err
}
Expand Down Expand Up @@ -3948,7 +3947,7 @@ func (mb *msgBlock) skipMsg(seq uint64, now time.Time) {
mb.mu.Unlock()

if needsRecord {
mb.writeMsgRecord(emptyRecordLen, seq|ebit, _EMPTY_, nil, nil, nowts, true, 0)
mb.writeMsgRecord(emptyRecordLen, seq|ebit, _EMPTY_, nil, nil, nowts, true)
} else {
mb.kickFlusher()
}
Expand Down Expand Up @@ -4046,7 +4045,7 @@ func (fs *fileStore) SkipMsgs(seq uint64, num uint64) error {
mb.mu.Unlock()

// Write out our placeholder.
mb.writeMsgRecord(emptyRecordLen, lseq|ebit, _EMPTY_, nil, nil, nowts, true, 0)
mb.writeMsgRecord(emptyRecordLen, lseq|ebit, _EMPTY_, nil, nil, nowts, true)

// Now update FS accounting.
// Update fs state.
Expand Down Expand Up @@ -4596,7 +4595,6 @@ func (mb *msgBlock) compactWithFloor(floor uint64) {
rl, slen := le.Uint32(hdr[0:]), le.Uint16(hdr[20:])
// Clear any headers bit that could be set.
rl &^= hbit
rl &^= xbit
dlen := int(rl) - msgHdrSize
// Do some quick sanity checks here.
if dlen < 0 || int(slen) > dlen || dlen > int(rl) || rl > rlBadThresh || index+rl > lbuf {
Expand Down Expand Up @@ -5398,12 +5396,12 @@ func (mb *msgBlock) enableForWriting(fip bool) error {

// Helper function to place a delete tombstone.
func (mb *msgBlock) writeTombstone(seq uint64, ts int64) error {
return mb.writeMsgRecord(emptyRecordLen, seq|tbit, _EMPTY_, nil, nil, ts, true, 0)
return mb.writeMsgRecord(emptyRecordLen, seq|tbit, _EMPTY_, nil, nil, ts, true)
}

// Will write the message record to the underlying message block.
// filestore lock will be held.
func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte, ts int64, flush bool, ttl int64) error {
func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte, ts int64, flush bool) error {
mb.mu.Lock()
defer mb.mu.Unlock()

Expand Down Expand Up @@ -5456,12 +5454,6 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte
l |= hbit
}

// TODO(nat): Only do if hbit set too?
// TODO(nat): ... or parse the header here?
if ttl > 0 {
l |= xbit
}

le.PutUint32(hdr[0:], l)
le.PutUint64(hdr[4:], seq)
le.PutUint64(hdr[12:], uint64(ts))
Expand Down Expand Up @@ -5631,12 +5623,12 @@ func (mb *msgBlock) updateAccounting(seq uint64, ts int64, rl uint64) {
}

// Lock should be held.
func (fs *fileStore) writeMsgRecord(seq uint64, ts int64, subj string, hdr, msg []byte, ttl int64) (uint64, error) {
func (fs *fileStore) writeMsgRecord(seq uint64, ts int64, subj string, hdr, msg []byte) (uint64, error) {
var err error

// Get size for this message.
rl := fileStoreMsgSize(subj, hdr, msg)
if rl&hbit != 0 || rl&xbit != 0 {
if rl&hbit != 0 {
return 0, ErrMsgTooLarge
}
// Grab our current last message block.
Expand All @@ -5657,7 +5649,7 @@ func (fs *fileStore) writeMsgRecord(seq uint64, ts int64, subj string, hdr, msg
}

// Ask msg block to store in write through cache.
err = mb.writeMsgRecord(rl, seq, subj, hdr, msg, ts, fs.fip, ttl)
err = mb.writeMsgRecord(rl, seq, subj, hdr, msg, ts, fs.fip)

return rl, err
}
Expand Down Expand Up @@ -6104,11 +6096,9 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {
hdr := buf[index : index+msgHdrSize]
rl, slen := le.Uint32(hdr[0:]), int(le.Uint16(hdr[20:]))
seq = le.Uint64(hdr[4:])
ttl := rl&xbit != 0

// Clear any headers bit that could be set.
rl &^= hbit
rl &^= xbit
dlen := int(rl) - msgHdrSize

// Do some quick sanity checks here.
Expand All @@ -6129,9 +6119,8 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {
erased := seq&ebit != 0
seq = seq &^ ebit

// TODO(nat): If we have lost the THW here, should we parse the header to find the
// original TTL from the message?
if ttl {
// TODO(nat): Not terribly optimal...
if len(hdr) > 0 && getMessageTTL(hdr) > 0 {
mb.ttls++
}

Expand Down Expand Up @@ -6613,8 +6602,6 @@ const (
tbit = 1 << 62
// Used to mark an index as deleted and non-existent.
dbit = 1 << 30
// Used for marking messages with expiries/TTLs.
xbit = 1 << 29
)

// Will do a lookup from cache.
Expand Down Expand Up @@ -6770,7 +6757,6 @@ func (mb *msgBlock) msgFromBuf(buf []byte, sm *StoreMsg, hh hash.Hash64) (*Store
rl := le.Uint32(hdr[0:])
hasHeaders := rl&hbit != 0
rl &^= hbit // clear header bit
rl &^= xbit // clear ttl bit
dlen := int(rl) - msgHdrSize
slen := int(le.Uint16(hdr[20:]))
// Simple sanity check.
Expand Down Expand Up @@ -7881,7 +7867,6 @@ func (mb *msgBlock) tombsLocked() []msgId {
rl, seq := le.Uint32(hdr[0:]), le.Uint64(hdr[4:])
// Clear any headers bit that could be set.
rl &^= hbit
rl &^= xbit
// Check for tombstones.
if seq&tbit != 0 {
ts := int64(le.Uint64(hdr[12:]))
Expand Down
2 changes: 1 addition & 1 deletion server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5736,7 +5736,7 @@ func TestFileStoreMsgBlockHolesAndIndexing(t *testing.T) {
mb := fs.getFirstBlock()
writeMsg := func(subj string, seq uint64) {
rl := fileStoreMsgSize(subj, nil, []byte(subj))
require_NoError(t, mb.writeMsgRecord(rl, seq, subj, nil, []byte(subj), time.Now().UnixNano(), true, 0))
require_NoError(t, mb.writeMsgRecord(rl, seq, subj, nil, []byte(subj), time.Now().UnixNano(), true))
fs.rebuildState(nil)
}
readMsg := func(seq uint64, expectedSubj string) {
Expand Down

0 comments on commit 4b5b21e

Please sign in to comment.