Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable JetStream on disk errors #6292

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 50 additions & 11 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,14 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim

// Do age checks too, make sure to call in place.
if fs.cfg.MaxAge != 0 {
fs.expireMsgsOnRecover()
err := fs.expireMsgsOnRecover()
if err != nil && err == errFileSystemPermissionDenied {
fs.srv.Warnf("file system permission denied while expiring msgs, disabling jetstream: %v", err)
// messages in block cache could be lost in the worst case.
// In the clustered mode it is very highly unlikely as a result of replication.
fs.srv.DisableJetStream()
return nil, err
}
fs.startAgeChk()
}

Expand Down Expand Up @@ -2088,9 +2095,9 @@ func (fs *fileStore) recoverMsgs() error {
// We will treat this differently in case we have a recovery
// that will expire alot of messages on startup.
// Should only be called on startup.
func (fs *fileStore) expireMsgsOnRecover() {
func (fs *fileStore) expireMsgsOnRecover() error {
if fs.state.Msgs == 0 {
return
return nil
}

var minAge = time.Now().UnixNano() - int64(fs.cfg.MaxAge)
Expand All @@ -2102,7 +2109,7 @@ func (fs *fileStore) expireMsgsOnRecover() {
// usually taken care of by fs.removeMsgBlock() but we do not call that here.
var last msgId

deleteEmptyBlock := func(mb *msgBlock) {
deleteEmptyBlock := func(mb *msgBlock) error {
// If we are the last keep state to remember first/last sequence.
// Do this part by hand since not deleting one by one.
if mb == fs.lmb {
Expand All @@ -2118,8 +2125,12 @@ func (fs *fileStore) expireMsgsOnRecover() {
}
return true
})
mb.dirtyCloseWithRemove(true)
err := mb.dirtyCloseWithRemove(true)
if err != nil && err == errFileSystemPermissionDenied {
return err
}
deleted++
return nil
}

for _, mb := range fs.blks {
Expand All @@ -2133,7 +2144,11 @@ func (fs *fileStore) expireMsgsOnRecover() {
if mb.last.ts <= minAge {
purged += mb.msgs
bytes += mb.bytes
deleteEmptyBlock(mb)
err := deleteEmptyBlock(mb)
if err != nil && err == errFileSystemPermissionDenied {
mb.mu.Unlock()
return err
}
mb.mu.Unlock()
continue
}
Expand Down Expand Up @@ -2258,6 +2273,7 @@ func (fs *fileStore) expireMsgsOnRecover() {
if purged > 0 {
fs.dirty++
}
return nil
}

func copyMsgBlocks(src []*msgBlock) []*msgBlock {
Expand Down Expand Up @@ -3686,6 +3702,9 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) {
dios <- struct{}{}

if err != nil {
if os.IsPermission(err) {
return nil, err
}
mb.dirtyCloseWithRemove(true)
return nil, fmt.Errorf("Error creating msg block file: %v", err)
}
Expand Down Expand Up @@ -6610,6 +6629,7 @@ var (
errNoMainKey = errors.New("encrypted store encountered with no main key")
errNoBlkData = errors.New("message block data missing")
errStateTooBig = errors.New("store state too big for optional write")
errFileSystemPermissionDenied = errors.New("storage directory not writeable")
)

const (
Expand Down Expand Up @@ -8076,9 +8096,9 @@ func (mb *msgBlock) dirtyClose() {
}

// Should be called with lock held.
func (mb *msgBlock) dirtyCloseWithRemove(remove bool) {
func (mb *msgBlock) dirtyCloseWithRemove(remove bool) error {
if mb == nil {
return
return nil
}
// Stop cache expiration timer.
if mb.ctmr != nil {
Expand All @@ -8100,13 +8120,20 @@ func (mb *msgBlock) dirtyCloseWithRemove(remove bool) {
// Clear any tracking by subject if we are removing.
mb.fss = nil
if mb.mfn != _EMPTY_ {
os.Remove(mb.mfn)
err := os.Remove(mb.mfn)
if err != nil && os.IsPermission(err){
return errFileSystemPermissionDenied
}
mb.mfn = _EMPTY_
}
if mb.kfn != _EMPTY_ {
os.Remove(mb.kfn)
err := os.Remove(mb.kfn)
if err != nil && os.IsPermission(err){
return errFileSystemPermissionDenied
}
}
}
return nil
}

// Remove a seq from the fss and select new first.
Expand Down Expand Up @@ -8523,7 +8550,15 @@ func (fs *fileStore) flushStreamStateLoop(qch, done chan struct{}) {
for {
select {
case <-t.C:
fs.writeFullState()
err := fs.writeFullState()
if err != nil && os.IsPermission(err) {
fs.warn("file system permission denied when flushing stream state, disabling jetstream %v", err)
// messages in block cache could be lost in the worst case.
// In the clustered mode it is very highly unlikely as a result of replication.
fs.srv.DisableJetStream()
return
}

case <-qch:
return
}
Expand Down Expand Up @@ -8732,6 +8767,10 @@ func (fs *fileStore) _writeFullState(force bool) error {
// Protect with dios.
<-dios
err := os.WriteFile(fn, buf, defaultFilePerms)
// if file system is not writable os.IsPermission is set to true
if err != nil && os.IsPermission(err) {
return err
}
dios <- struct{}{}

// Update dirty if successful.
Expand Down
80 changes: 79 additions & 1 deletion server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"errors"
"fmt"
"io"
"io/fs"
"math/bits"
"math/rand"
"os"
Expand Down Expand Up @@ -144,9 +145,9 @@ func TestFileStoreBasics(t *testing.T) {
func TestFileStoreMsgHeaders(t *testing.T) {
testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) {
fs, err := newFileStoreWithCreated(fcfg, StreamConfig{Name: "zzz", Storage: FileStorage}, time.Now(), prf(&fcfg), nil)

require_NoError(t, err)
defer fs.Stop()

subj, hdr, msg := "foo", []byte("name:derek"), []byte("Hello World")
elen := 22 + len(subj) + 4 + len(hdr) + len(msg) + 8
if sz := int(fileStoreMsgSize(subj, hdr, msg)); sz != elen {
Expand Down Expand Up @@ -8483,3 +8484,80 @@ func TestFileStoreMessageTTLRecoveredSingleMessageWithoutStreamState(t *testing.
require_Equal(t, ss.Msgs, 0)
})
}
func TestStoreRawMessageThrowsPermissionErrorIfFSModeReadOnly(t *testing.T) {
cfg := StreamConfig{Name: "zzz", Subjects: []string{"ev.1"}, Storage: FileStorage, MaxAge: 500 * time.Millisecond}
fs, err := newFileStore(
FileStoreConfig{StoreDir: t.TempDir()},
cfg)

require_NoError(t, err)
defer fs.Stop()

msg := bytes.Repeat([]byte("Z"), 1024)
directory := fs.fcfg.StoreDir
ORIGINAL_FILE_MODE , _ := os.Stat(directory)
READONLY_MODE := os.FileMode(0o555)
changeDirectoryPermission(directory, READONLY_MODE)
require_NoError(t, err)
totalMsgs := 10000
i:=0
for i = 0; i < totalMsgs; i++ {
_, _, err = fs.StoreMsg("ev.1", nil, msg, 0)
if err != nil {
break;
}
}
changeDirectoryPermission(directory, ORIGINAL_FILE_MODE.Mode())
require_Error(t, err, os.ErrPermission)
}

func TestWriteFullStateThrowsPermissionErrorIfFSModeReadOnly(t *testing.T) {
cfg := StreamConfig{Name: "zzz", Subjects: []string{"ev.1"}, Storage: FileStorage, MaxAge: 500 * time.Millisecond}
fs, err := newFileStore(
FileStoreConfig{StoreDir: t.TempDir()},
cfg)

require_NoError(t, err)
defer fs.Stop()

msg := bytes.Repeat([]byte("Z"), 1024)
directory := fs.fcfg.StoreDir
ORIGINAL_FILE_MODE , _ := os.Stat(directory)
READONLY_MODE := os.FileMode(0o555)
require_NoError(t, err)
totalMsgs := 10000
i:=0
for i = 0; i < totalMsgs; i++ {
_, _, err = fs.StoreMsg("ev.1", nil, msg, 0)
if err != nil {
break;
}
}
changeDirectoryPermission(directory, READONLY_MODE)
err = fs.writeFullState()
changeDirectoryPermission(directory, ORIGINAL_FILE_MODE.Mode())
require_Error(t, err, os.ErrPermission)
}

func changeDirectoryPermission(directory string, mode fs.FileMode) error {
err := filepath.Walk(directory, func(path string, info os.FileInfo, err error) error {
if err != nil {
return fmt.Errorf("error accessing path %q: %w", path, err)
}

// Check if the path is a directory or file and set permissions accordingly
if info.IsDir() {
err = os.Chmod(path, mode)
if err != nil {
return fmt.Errorf("error changing directory permissions for %q: %w", path, err)
}
} else {
err = os.Chmod(path, mode)
if err != nil {
return fmt.Errorf("error changing file permissions for %q: %w", path, err)
}
}
return nil
})
return err
}
2 changes: 1 addition & 1 deletion server/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -6086,4 +6086,4 @@ func expandPath(p string) (string, error) {
}

return filepath.Join(home, p[1:]), nil
}
}
8 changes: 8 additions & 0 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -5062,6 +5062,14 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
}

if err != nil {
if os.IsPermission(err){
mset.mu.Unlock()
// messages in block cache could be lost in the worst case.
// In the clustered mode it is very highly unlikely as a result of replication.
mset.srv.DisableJetStream()
mset.srv.Warnf("file system permission denied while writing msg, disabling jetstream: %v", err)
return err
}
// If we did not succeed put those values back and increment clfs in case we are clustered.
var state StreamState
mset.store.FastState(&state)
Expand Down