diff --git a/server/filestore.go b/server/filestore.go index 8c5907621d..1b13240081 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -515,7 +515,14 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim // Do age checks too, make sure to call in place. if fs.cfg.MaxAge != 0 { - fs.expireMsgsOnRecover() + err := fs.expireMsgsOnRecover() + if err != nil && err == errFileSystemPermissionDenied { + fs.srv.Warnf("file system permission denied while expiring msgs, disabling jetstream: %v", err) + // messages in block cache could be lost in the worst case. + // In the clustered mode it is very highly unlikely as a result of replication. + fs.srv.DisableJetStream() + return nil, err + } fs.startAgeChk() } @@ -2088,9 +2095,9 @@ func (fs *fileStore) recoverMsgs() error { // We will treat this differently in case we have a recovery // that will expire alot of messages on startup. // Should only be called on startup. -func (fs *fileStore) expireMsgsOnRecover() { +func (fs *fileStore) expireMsgsOnRecover() error { if fs.state.Msgs == 0 { - return + return nil } var minAge = time.Now().UnixNano() - int64(fs.cfg.MaxAge) @@ -2102,7 +2109,7 @@ func (fs *fileStore) expireMsgsOnRecover() { // usually taken care of by fs.removeMsgBlock() but we do not call that here. var last msgId - deleteEmptyBlock := func(mb *msgBlock) { + deleteEmptyBlock := func(mb *msgBlock) error { // If we are the last keep state to remember first/last sequence. // Do this part by hand since not deleting one by one. if mb == fs.lmb { @@ -2118,8 +2125,12 @@ func (fs *fileStore) expireMsgsOnRecover() { } return true }) - mb.dirtyCloseWithRemove(true) + err := mb.dirtyCloseWithRemove(true) + if err != nil && err == errFileSystemPermissionDenied { + return err + } deleted++ + return nil } for _, mb := range fs.blks { @@ -2133,7 +2144,11 @@ func (fs *fileStore) expireMsgsOnRecover() { if mb.last.ts <= minAge { purged += mb.msgs bytes += mb.bytes - deleteEmptyBlock(mb) + err := deleteEmptyBlock(mb) + if err != nil && err == errFileSystemPermissionDenied { + mb.mu.Unlock() + return err + } mb.mu.Unlock() continue } @@ -2258,6 +2273,7 @@ func (fs *fileStore) expireMsgsOnRecover() { if purged > 0 { fs.dirty++ } + return nil } func copyMsgBlocks(src []*msgBlock) []*msgBlock { @@ -3686,6 +3702,9 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) { dios <- struct{}{} if err != nil { + if os.IsPermission(err) { + return nil, err + } mb.dirtyCloseWithRemove(true) return nil, fmt.Errorf("Error creating msg block file: %v", err) } @@ -6610,6 +6629,7 @@ var ( errNoMainKey = errors.New("encrypted store encountered with no main key") errNoBlkData = errors.New("message block data missing") errStateTooBig = errors.New("store state too big for optional write") + errFileSystemPermissionDenied = errors.New("storage directory not writeable") ) const ( @@ -8076,9 +8096,9 @@ func (mb *msgBlock) dirtyClose() { } // Should be called with lock held. -func (mb *msgBlock) dirtyCloseWithRemove(remove bool) { +func (mb *msgBlock) dirtyCloseWithRemove(remove bool) error { if mb == nil { - return + return nil } // Stop cache expiration timer. if mb.ctmr != nil { @@ -8100,13 +8120,20 @@ func (mb *msgBlock) dirtyCloseWithRemove(remove bool) { // Clear any tracking by subject if we are removing. mb.fss = nil if mb.mfn != _EMPTY_ { - os.Remove(mb.mfn) + err := os.Remove(mb.mfn) + if err != nil && os.IsPermission(err){ + return errFileSystemPermissionDenied + } mb.mfn = _EMPTY_ } if mb.kfn != _EMPTY_ { - os.Remove(mb.kfn) + err := os.Remove(mb.kfn) + if err != nil && os.IsPermission(err){ + return errFileSystemPermissionDenied + } } } + return nil } // Remove a seq from the fss and select new first. @@ -8523,7 +8550,15 @@ func (fs *fileStore) flushStreamStateLoop(qch, done chan struct{}) { for { select { case <-t.C: - fs.writeFullState() + err := fs.writeFullState() + if err != nil && os.IsPermission(err) { + fs.warn("file system permission denied when flushing stream state, disabling jetstream %v", err) + // messages in block cache could be lost in the worst case. + // In the clustered mode it is very highly unlikely as a result of replication. + fs.srv.DisableJetStream() + return + } + case <-qch: return } @@ -8732,6 +8767,10 @@ func (fs *fileStore) _writeFullState(force bool) error { // Protect with dios. <-dios err := os.WriteFile(fn, buf, defaultFilePerms) + // if file system is not writable os.IsPermission is set to true + if err != nil && os.IsPermission(err) { + return err + } dios <- struct{}{} // Update dirty if successful. diff --git a/server/filestore_test.go b/server/filestore_test.go index f78b69a391..8ce1684ec6 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -28,6 +28,7 @@ import ( "errors" "fmt" "io" + "io/fs" "math/bits" "math/rand" "os" @@ -144,9 +145,9 @@ func TestFileStoreBasics(t *testing.T) { func TestFileStoreMsgHeaders(t *testing.T) { testFileStoreAllPermutations(t, func(t *testing.T, fcfg FileStoreConfig) { fs, err := newFileStoreWithCreated(fcfg, StreamConfig{Name: "zzz", Storage: FileStorage}, time.Now(), prf(&fcfg), nil) + require_NoError(t, err) defer fs.Stop() - subj, hdr, msg := "foo", []byte("name:derek"), []byte("Hello World") elen := 22 + len(subj) + 4 + len(hdr) + len(msg) + 8 if sz := int(fileStoreMsgSize(subj, hdr, msg)); sz != elen { @@ -8483,3 +8484,80 @@ func TestFileStoreMessageTTLRecoveredSingleMessageWithoutStreamState(t *testing. require_Equal(t, ss.Msgs, 0) }) } +func TestStoreRawMessageThrowsPermissionErrorIfFSModeReadOnly(t *testing.T) { + cfg := StreamConfig{Name: "zzz", Subjects: []string{"ev.1"}, Storage: FileStorage, MaxAge: 500 * time.Millisecond} + fs, err := newFileStore( + FileStoreConfig{StoreDir: t.TempDir()}, + cfg) + + require_NoError(t, err) + defer fs.Stop() + + msg := bytes.Repeat([]byte("Z"), 1024) + directory := fs.fcfg.StoreDir + ORIGINAL_FILE_MODE , _ := os.Stat(directory) + READONLY_MODE := os.FileMode(0o555) + changeDirectoryPermission(directory, READONLY_MODE) + require_NoError(t, err) + totalMsgs := 10000 + i:=0 + for i = 0; i < totalMsgs; i++ { + _, _, err = fs.StoreMsg("ev.1", nil, msg, 0) + if err != nil { + break; + } + } + changeDirectoryPermission(directory, ORIGINAL_FILE_MODE.Mode()) + require_Error(t, err, os.ErrPermission) +} + +func TestWriteFullStateThrowsPermissionErrorIfFSModeReadOnly(t *testing.T) { + cfg := StreamConfig{Name: "zzz", Subjects: []string{"ev.1"}, Storage: FileStorage, MaxAge: 500 * time.Millisecond} + fs, err := newFileStore( + FileStoreConfig{StoreDir: t.TempDir()}, + cfg) + + require_NoError(t, err) + defer fs.Stop() + + msg := bytes.Repeat([]byte("Z"), 1024) + directory := fs.fcfg.StoreDir + ORIGINAL_FILE_MODE , _ := os.Stat(directory) + READONLY_MODE := os.FileMode(0o555) + require_NoError(t, err) + totalMsgs := 10000 + i:=0 + for i = 0; i < totalMsgs; i++ { + _, _, err = fs.StoreMsg("ev.1", nil, msg, 0) + if err != nil { + break; + } + } + changeDirectoryPermission(directory, READONLY_MODE) + err = fs.writeFullState() + changeDirectoryPermission(directory, ORIGINAL_FILE_MODE.Mode()) + require_Error(t, err, os.ErrPermission) +} + +func changeDirectoryPermission(directory string, mode fs.FileMode) error { + err := filepath.Walk(directory, func(path string, info os.FileInfo, err error) error { + if err != nil { + return fmt.Errorf("error accessing path %q: %w", path, err) + } + + // Check if the path is a directory or file and set permissions accordingly + if info.IsDir() { + err = os.Chmod(path, mode) + if err != nil { + return fmt.Errorf("error changing directory permissions for %q: %w", path, err) + } + } else { + err = os.Chmod(path, mode) + if err != nil { + return fmt.Errorf("error changing file permissions for %q: %w", path, err) + } + } + return nil + }) + return err +} diff --git a/server/jetstream.go b/server/jetstream.go index a0b31cd638..1ba3342408 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -3034,3 +3034,14 @@ func fixCfgMirrorWithDedupWindow(cfg *StreamConfig) { cfg.Duplicates = 0 } } + +func (s *Server) handleWritePermissionError() { + //TODO Check if we should add s.jetStreamOOSPending in condition + if s.JetStreamEnabled() { + s.Errorf("file system permission denied while writing, disabling jetstream") + + go s.DisableJetStream() + + //TODO Send respective advisory if needed, same as in handleOutOfSpace + } +} diff --git a/server/opts.go b/server/opts.go index 172377253e..41b660008f 100644 --- a/server/opts.go +++ b/server/opts.go @@ -6086,4 +6086,4 @@ func expandPath(p string) (string, error) { } return filepath.Join(home, p[1:]), nil -} +} \ No newline at end of file diff --git a/server/raft.go b/server/raft.go index c1102937bf..233ecabe69 100644 --- a/server/raft.go +++ b/server/raft.go @@ -3978,6 +3978,10 @@ func (n *raft) setWriteErrLocked(err error) { n.error("Critical write error: %v", err) n.werr = err + if os.IsPermission(err) { + go n.s.handleWritePermissionError() + } + if isOutOfSpaceErr(err) { // For now since this can be happening all under the covers, we will call up and disable JetStream. go n.s.handleOutOfSpace(nil) diff --git a/server/stream.go b/server/stream.go index ce23941eb0..56ab950ad4 100644 --- a/server/stream.go +++ b/server/stream.go @@ -5074,6 +5074,14 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, } if err != nil { + if os.IsPermission(err){ + mset.mu.Unlock() + // messages in block cache could be lost in the worst case. + // In the clustered mode it is very highly unlikely as a result of replication. + mset.srv.DisableJetStream() + mset.srv.Warnf("file system permission denied while writing msg, disabling jetstream: %v", err) + return err + } // If we did not succeed put those values back and increment clfs in case we are clustered. var state StreamState mset.store.FastState(&state)