Skip to content

Commit

Permalink
Shutdown jetstream when os returns permission error during write
Browse files Browse the repository at this point in the history
Signed-off-by: Sourabh Agrawal <[email protected]>
  • Loading branch information
souravagrawal committed Dec 22, 2024
1 parent 87e32fe commit 835c8bd
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 11 deletions.
56 changes: 46 additions & 10 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ type FileStoreConfig struct {
Cipher StoreCipher
// Compression is the algorithm to use when compressing.
Compression StoreCompression
// Allows disabling jetstream when fs is not writable
JetStreamDisableOnDiskError bool

// Internal reference to our server.
srv *Server
Expand Down Expand Up @@ -496,7 +498,14 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim

// Do age checks too, make sure to call in place.
if fs.cfg.MaxAge != 0 {
fs.expireMsgsOnRecover()
err := fs.expireMsgsOnRecover()
if err != nil && err == errFileSystemPermissionDenied && fs.fcfg.JetStreamDisableOnDiskError {
fs.srv.Warnf("Permission issue detected while expiring msgs, disabling jetstream: %v", err)
// messages in block cache could be lost in the worst case.
// In the clustered mode it is very highly unlikely as a result of replication.
fs.srv.DisableJetStream()
return nil, err
}
fs.startAgeChk()
}

Expand Down Expand Up @@ -1979,9 +1988,9 @@ func (fs *fileStore) recoverMsgs() error {
// We will treat this differently in case we have a recovery
// that will expire alot of messages on startup.
// Should only be called on startup.
func (fs *fileStore) expireMsgsOnRecover() {
func (fs *fileStore) expireMsgsOnRecover() error {
if fs.state.Msgs == 0 {
return
return nil
}

var minAge = time.Now().UnixNano() - int64(fs.cfg.MaxAge)
Expand All @@ -1993,7 +2002,7 @@ func (fs *fileStore) expireMsgsOnRecover() {
// usually taken care of by fs.removeMsgBlock() but we do not call that here.
var last msgId

deleteEmptyBlock := func(mb *msgBlock) {
deleteEmptyBlock := func(mb *msgBlock) error {
// If we are the last keep state to remember first/last sequence.
// Do this part by hand since not deleting one by one.
if mb == fs.lmb {
Expand All @@ -2009,8 +2018,12 @@ func (fs *fileStore) expireMsgsOnRecover() {
}
return true
})
mb.dirtyCloseWithRemove(true)
err := mb.dirtyCloseWithRemove(true)
if err != nil && err == errFileSystemPermissionDenied && fs.fcfg.JetStreamDisableOnDiskError{
return err
}
deleted++
return nil
}

for _, mb := range fs.blks {
Expand All @@ -2024,7 +2037,10 @@ func (fs *fileStore) expireMsgsOnRecover() {
if mb.last.ts <= minAge {
purged += mb.msgs
bytes += mb.bytes
deleteEmptyBlock(mb)
err := deleteEmptyBlock(mb)
if err != nil && err == errFileSystemPermissionDenied && fs.fcfg.JetStreamDisableOnDiskError{
return err
}
mb.mu.Unlock()
continue
}
Expand Down Expand Up @@ -2149,6 +2165,7 @@ func (fs *fileStore) expireMsgsOnRecover() {
if purged > 0 {
fs.dirty++
}
return nil
}

func copyMsgBlocks(src []*msgBlock) []*msgBlock {
Expand Down Expand Up @@ -3576,6 +3593,9 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) {
mfd, err := os.OpenFile(mb.mfn, os.O_CREATE|os.O_RDWR, defaultFilePerms)
dios <- struct{}{}

if err != nil && os.IsPermission(err) && fs.fcfg.JetStreamDisableOnDiskError {
return nil, err
}
if err != nil {
mb.dirtyCloseWithRemove(true)
return nil, fmt.Errorf("Error creating msg block file: %v", err)
Expand Down Expand Up @@ -6428,6 +6448,7 @@ var (
errNoMainKey = errors.New("encrypted store encountered with no main key")
errNoBlkData = errors.New("message block data missing")
errStateTooBig = errors.New("store state too big for optional write")
errFileSystemPermissionDenied = errors.New("storage directory not writeable")
)

const (
Expand Down Expand Up @@ -7894,9 +7915,9 @@ func (mb *msgBlock) dirtyClose() {
}

// Should be called with lock held.
func (mb *msgBlock) dirtyCloseWithRemove(remove bool) {
func (mb *msgBlock) dirtyCloseWithRemove(remove bool) error {
if mb == nil {
return
return nil
}
// Stop cache expiration timer.
if mb.ctmr != nil {
Expand All @@ -7918,13 +7939,20 @@ func (mb *msgBlock) dirtyCloseWithRemove(remove bool) {
// Clear any tracking by subject if we are removing.
mb.fss = nil
if mb.mfn != _EMPTY_ {
os.Remove(mb.mfn)
err := os.Remove(mb.mfn)
if err != nil && os.IsPermission(err) && mb.fs.fcfg.JetStreamDisableOnDiskError{
return errFileSystemPermissionDenied
}
mb.mfn = _EMPTY_
}
if mb.kfn != _EMPTY_ {
os.Remove(mb.kfn)
err := os.Remove(mb.kfn)
if err != nil && os.IsPermission(err) && mb.fs.fcfg.JetStreamDisableOnDiskError{
return errFileSystemPermissionDenied
}
}
}
return nil
}

// Remove a seq from the fss and select new first.
Expand Down Expand Up @@ -8545,6 +8573,14 @@ func (fs *fileStore) _writeFullState(force bool) error {
// Protect with dios.
<-dios
err := os.WriteFile(fn, buf, defaultFilePerms)
// if file system is not writable os.IsPermission is set to true
if err != nil && os.IsPermission(err) && fs.fcfg.JetStreamDisableOnDiskError {
fs.warn("permission error while flushing stream state to disk, disabling jetstream %v", err)
// messages in block cache could be lost in the worst case.
// In the clustered mode it is very highly unlikely as a result of replication.
fs.srv.DisableJetStream()
return err
}
dios <- struct{}{}

// Update dirty if successful.
Expand Down
7 changes: 6 additions & 1 deletion server/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ type Options struct {
JetStreamMaxCatchup int64
JetStreamRequestQueueLimit int64
StreamMaxBufferedMsgs int `json:"-"`
JetStreamDisableOnDiskError bool `json:"-"`
StreamMaxBufferedSize int64 `json:"-"`
StoreDir string `json:"-"`
SyncInterval time.Duration `json:"-"`
Expand Down Expand Up @@ -2445,6 +2446,10 @@ func parseJetStream(v any, opts *Options, errors *[]error, warnings *[]error) er
return &configErr{tk, fmt.Sprintf("Expected a parseable size for %q, got %v", mk, mv)}
}
opts.JetStreamRequestQueueLimit = lim
case "disable_js_on_disk_error":
if v, ok := mv.(bool); ok {
opts.JetStreamDisableOnDiskError = v
}
default:
if !tk.IsUsedVariable() {
err := &unknownConfigFieldErr{
Expand Down Expand Up @@ -6086,4 +6091,4 @@ func expandPath(p string) (string, error) {
}

return filepath.Join(home, p[1:]), nil
}
}
9 changes: 9 additions & 0 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
fsCfg.SyncInterval = s.getOpts().SyncInterval
fsCfg.SyncAlways = s.getOpts().SyncAlways
fsCfg.Compression = config.Compression
fsCfg.JetStreamDisableOnDiskError = s.getOpts().JetStreamDisableOnDiskError

if err := mset.setupStore(fsCfg); err != nil {
mset.stop(true, false)
Expand Down Expand Up @@ -5013,6 +5014,14 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
}
err = store.StoreRawMsg(subject, hdr, msg, seq, ts)
}
if err != nil && os.IsPermission(err) && mset.srv.getOpts().JetStreamDisableOnDiskError {
mset.mu.Unlock()
// messages in block cache could be lost in the worst case.
// In the clustered mode it is very highly unlikely as a result of replication.
mset.srv.DisableJetStream()
mset.srv.Warnf("permission denied while writing msg, disabling jetstream: %v", err)
return err
}

if err != nil {
// If we did not succeed put those values back and increment clfs in case we are clustered.
Expand Down

0 comments on commit 835c8bd

Please sign in to comment.