Skip to content

Commit

Permalink
Merge pull request #595 from iotaledger/fix/bucketmanager-deadlock
Browse files Browse the repository at this point in the history
Fix BucketManager deadlock and shutdown issue
  • Loading branch information
alexsporn authored Dec 4, 2023
2 parents e8f363c + a86b55e commit fc84c91
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 41 deletions.
26 changes: 16 additions & 10 deletions pkg/storage/database/db_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,23 @@ func NewDBInstance(dbConfig Config, openedCallback func(d *DBInstance)) *DBInsta
// lockedKVStore and the underlying openableKVStore don't handle opening and closing the underlying store by themselves,
// but delegate that to the entity that constructed it. It needs to be done like that, because opening and closing the underlying store also
// modifies the state of the DBInstance. Other methods (e.g. Flush) that don't modify the state of DBInstance can be handled directly.
lockableKVStore := newLockedKVStore(db, func() {
lockableKVStore := newLockedKVStore(db, func() error {
if dbInstance.isClosed.Load() {
storeInstanceMutex.Lock()
defer storeInstanceMutex.Unlock()

if dbInstance.isClosed.Load() {
dbInstance.Open()
if err := dbInstance.Open(); err != nil {
return err
}

if openedCallback != nil {
openedCallback(dbInstance)
}
}
}

return nil
}, func() {
dbInstance.CloseWithoutLocking()
})
Expand All @@ -64,10 +69,6 @@ func NewDBInstance(dbConfig Config, openedCallback func(d *DBInstance)) *DBInsta

dbInstance.healthTracker = storeHealthTracker

if openedCallback != nil {
openedCallback(dbInstance)
}

return dbInstance
}

Expand All @@ -82,7 +83,9 @@ func (d *DBInstance) Flush() {
defer d.store.UnlockAccess()

if !d.isClosed.Load() {
_ = d.store.instance().Flush()
if instance, err := d.store.instance(); err == nil {
_ = instance.Flush()
}
}
}

Expand Down Expand Up @@ -113,22 +116,25 @@ func (d *DBInstance) CloseWithoutLocking() {

// Open re-opens a closed DBInstance. It must only be called while holding a lock on DBInstance,
// otherwise it might cause a race condition and corruption of node's state.
func (d *DBInstance) Open() {
func (d *DBInstance) Open() error {
if !d.isClosed.Load() {
panic("cannot open DBInstance that is not closed")
panic(ErrDatabaseNotClosed)
}

if d.isShutdown.Load() {
panic("cannot open DBInstance that is shutdown")
return ErrDatabaseShutdown
}

d.store.Replace(lo.PanicOnErr(StoreWithDefaultSettings(d.dbConfig.Directory, false, d.dbConfig.Engine)))

d.isClosed.Store(false)

if err := d.healthTracker.MarkCorrupted(); err != nil {
// panic immediately as in this case the database state is corrupted
panic(err)
}

return nil
}

func (d *DBInstance) LockAccess() {
Expand Down
8 changes: 5 additions & 3 deletions pkg/storage/database/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package database
import "github.com/iotaledger/hive.go/ierrors"

var (
ErrEpochPruned = ierrors.New("epoch pruned")
ErrNoPruningNeeded = ierrors.New("no pruning needed")
ErrDatabaseFull = ierrors.New("database full")
ErrEpochPruned = ierrors.New("epoch pruned")
ErrNoPruningNeeded = ierrors.New("no pruning needed")
ErrDatabaseFull = ierrors.New("database full")
ErrDatabaseShutdown = ierrors.New("cannot open DBInstance that is shutdown")
ErrDatabaseNotClosed = ierrors.New("cannot open DBInstance that is not closed")
)
2 changes: 1 addition & 1 deletion pkg/storage/database/lockedkvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type lockedKVStore struct {
accessMutex *syncutils.StarvingMutex
}

func newLockedKVStore(storeInstance kvstore.KVStore, openStoreIfNecessary func(), closeStore func()) *lockedKVStore {
func newLockedKVStore(storeInstance kvstore.KVStore, openStoreIfNecessary func() error, closeStore func()) *lockedKVStore {
return &lockedKVStore{
openableKVStore: newOpenableKVStore(storeInstance, openStoreIfNecessary, closeStore),
accessMutex: syncutils.NewStarvingMutex(),
Expand Down
82 changes: 67 additions & 15 deletions pkg/storage/database/openablekvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ import (
)

type openableKVStore struct {
openIfNecessary func() // openIfNecessary callback should synchronize itself and make sure that storeInstance is ready to use after.
openIfNecessary func() error // openIfNecessary callback should synchronize itself and make sure that storeInstance is ready to use after.
closeStore func()
storeInstance kvstore.KVStore // storeInstance is a KVStore that is holding the reference to the underlying database.
parentStore *openableKVStore
dbPrefix kvstore.KeyPrefix
}

func newOpenableKVStore(storeInstance kvstore.KVStore, openStoreIfNecessary func(), closeStore func()) *openableKVStore {
func newOpenableKVStore(storeInstance kvstore.KVStore, openStoreIfNecessary func() error, closeStore func()) *openableKVStore {
return &openableKVStore{
openIfNecessary: openStoreIfNecessary,
closeStore: closeStore,
Expand All @@ -36,12 +36,14 @@ func (s *openableKVStore) topParent() *openableKVStore {
return current
}

func (s *openableKVStore) instance() kvstore.KVStore {
func (s *openableKVStore) instance() (kvstore.KVStore, error) {
parent := s.topParent()
// openIfNecessary callback should synchronize itself and make sure that storeInstance is ready to use after.
parent.openIfNecessary()
if err := parent.openIfNecessary(); err != nil {
return nil, err
}

return parent.storeInstance
return parent.storeInstance, nil
}

func (s *openableKVStore) Replace(newKVStore kvstore.KVStore) {
Expand Down Expand Up @@ -75,43 +77,88 @@ func (s *openableKVStore) Realm() kvstore.Realm {
}

func (s *openableKVStore) Iterate(prefix kvstore.KeyPrefix, kvConsumerFunc kvstore.IteratorKeyValueConsumerFunc, direction ...kvstore.IterDirection) error {
return s.instance().Iterate(s.buildKeyPrefix(prefix), func(key kvstore.Key, value kvstore.Value) bool {
instance, err := s.instance()
if err != nil {
return err
}

return instance.Iterate(s.buildKeyPrefix(prefix), func(key kvstore.Key, value kvstore.Value) bool {
return kvConsumerFunc(utils.CopyBytes(key)[len(s.dbPrefix):], value)
}, direction...)
}

func (s *openableKVStore) IterateKeys(prefix kvstore.KeyPrefix, consumerFunc kvstore.IteratorKeyConsumerFunc, direction ...kvstore.IterDirection) error {
return s.instance().IterateKeys(s.buildKeyPrefix(prefix), func(key kvstore.Key) bool {
instance, err := s.instance()
if err != nil {
return err
}

return instance.IterateKeys(s.buildKeyPrefix(prefix), func(key kvstore.Key) bool {
return consumerFunc(utils.CopyBytes(key)[len(s.dbPrefix):])
}, direction...)
}

func (s *openableKVStore) Clear() error {
return s.instance().DeletePrefix(s.dbPrefix)
instance, err := s.instance()
if err != nil {
return err
}

return instance.DeletePrefix(s.dbPrefix)
}

func (s *openableKVStore) Get(key kvstore.Key) (value kvstore.Value, err error) {
return s.instance().Get(byteutils.ConcatBytes(s.dbPrefix, key))
instance, err := s.instance()
if err != nil {
return nil, err
}

return instance.Get(byteutils.ConcatBytes(s.dbPrefix, key))
}

func (s *openableKVStore) Set(key kvstore.Key, value kvstore.Value) error {
return s.instance().Set(byteutils.ConcatBytes(s.dbPrefix, key), value)
instance, err := s.instance()
if err != nil {
return err
}

return instance.Set(byteutils.ConcatBytes(s.dbPrefix, key), value)
}

func (s *openableKVStore) Has(key kvstore.Key) (bool, error) {
return s.instance().Has(byteutils.ConcatBytes(s.dbPrefix, key))
instance, err := s.instance()
if err != nil {
return false, err
}

return instance.Has(byteutils.ConcatBytes(s.dbPrefix, key))
}

func (s *openableKVStore) Delete(key kvstore.Key) error {
return s.instance().Delete(byteutils.ConcatBytes(s.dbPrefix, key))
instance, err := s.instance()
if err != nil {
return err
}

return instance.Delete(byteutils.ConcatBytes(s.dbPrefix, key))
}

func (s *openableKVStore) DeletePrefix(prefix kvstore.KeyPrefix) error {
return s.instance().DeletePrefix(s.buildKeyPrefix(prefix))
instance, err := s.instance()
if err != nil {
return err
}

return instance.DeletePrefix(s.buildKeyPrefix(prefix))
}

func (s *openableKVStore) Flush() error {
return s.instance().Flush()
instance, err := s.instance()
if err != nil {
return err
}

return instance.Flush()
}

func (s *openableKVStore) Close() error {
Expand Down Expand Up @@ -175,7 +222,12 @@ func (s *openableKVStoreBatchedMutations) Cancel() {
}

func (s *openableKVStoreBatchedMutations) Commit() error {
batched, err := s.parentStore.instance().Batched()
instance, err := s.parentStore.instance()
if err != nil {
return err
}

batched, err := instance.Batched()
if err != nil {
return err
}
Expand Down
40 changes: 28 additions & 12 deletions pkg/storage/prunable/bucket_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,21 +205,37 @@ func (b *BucketManager) getDBInstance(epoch iotago.EpochIndex) *database.DBInsta
b.mutex.RLock()
defer b.mutex.RUnlock()

// check if exists again, as another goroutine might have created it in parallel
db := lo.Return1(b.openDBs.GetOrCreate(epoch, func() *database.DBInstance {
db := database.NewDBInstance(b.dbConfig.WithDirectory(dbPathFromIndex(b.dbConfig.Directory, epoch)), func(d *database.DBInstance) {
b.openDBsCacheMutex.Lock()
defer b.openDBsCacheMutex.Unlock()
// Try to retrieve the DBInstance from cache in the first step.
// This puts the DBInstance at the front of the LRU cache.
b.openDBsCacheMutex.Lock()
if db, exists := b.openDBsCache.Get(epoch); exists {
b.openDBsCacheMutex.Unlock()

// Mark the db as used in the cache.
b.openDBsCache.Put(epoch, d)
return db
}
b.openDBsCacheMutex.Unlock()

// Remove the cached db size since we will open the db.
b.dbSizes.Delete(epoch)
})
openedCallback := func(d *database.DBInstance) {
b.openDBsCacheMutex.Lock()
defer b.openDBsCacheMutex.Unlock()

return db
}))
// Mark the db as used in the cache.
b.openDBsCache.Put(epoch, d)

// Remove the cached db size since we will open the db.
b.dbSizes.Delete(epoch)
}

// check if exists again, as another goroutine might have created it in parallel
db, created := b.openDBs.GetOrCreate(epoch, func() *database.DBInstance {
return database.NewDBInstance(b.dbConfig.WithDirectory(dbPathFromIndex(b.dbConfig.Directory, epoch)), openedCallback)
})

if created {
// Call openedCallback here instead of inside NewDBInstance to avoid a deadlock due to
// locking `b.openDBs` and `b.openDBsCacheMutex` in different order.
openedCallback(db)
}

return db
}
Expand Down

0 comments on commit fc84c91

Please sign in to comment.