From 94654a0bec7a7fe15f482ca22d603880b323e19d Mon Sep 17 00:00:00 2001 From: Piotr Macek <4007944+piotrm50@users.noreply.github.com> Date: Mon, 4 Dec 2023 16:01:16 +0100 Subject: [PATCH 1/2] Fix deadlock on DBInstance due to locking in different order. --- pkg/storage/database/db_instance.go | 4 --- pkg/storage/prunable/bucket_manager.go | 40 ++++++++++++++++++-------- 2 files changed, 28 insertions(+), 16 deletions(-) diff --git a/pkg/storage/database/db_instance.go b/pkg/storage/database/db_instance.go index 3421c464c..543cf1825 100644 --- a/pkg/storage/database/db_instance.go +++ b/pkg/storage/database/db_instance.go @@ -64,10 +64,6 @@ func NewDBInstance(dbConfig Config, openedCallback func(d *DBInstance)) *DBInsta dbInstance.healthTracker = storeHealthTracker - if openedCallback != nil { - openedCallback(dbInstance) - } - return dbInstance } diff --git a/pkg/storage/prunable/bucket_manager.go b/pkg/storage/prunable/bucket_manager.go index 78e7920fa..3539fb388 100644 --- a/pkg/storage/prunable/bucket_manager.go +++ b/pkg/storage/prunable/bucket_manager.go @@ -205,21 +205,37 @@ func (b *BucketManager) getDBInstance(epoch iotago.EpochIndex) *database.DBInsta b.mutex.RLock() defer b.mutex.RUnlock() - // check if exists again, as another goroutine might have created it in parallel - db := lo.Return1(b.openDBs.GetOrCreate(epoch, func() *database.DBInstance { - db := database.NewDBInstance(b.dbConfig.WithDirectory(dbPathFromIndex(b.dbConfig.Directory, epoch)), func(d *database.DBInstance) { - b.openDBsCacheMutex.Lock() - defer b.openDBsCacheMutex.Unlock() + // Try to retrieve the DBInstance from cache in the first step. + // This puts the DBInstance at the front of the LRU cache. + b.openDBsCacheMutex.Lock() + if db, exists := b.openDBsCache.Get(epoch); exists { + b.openDBsCacheMutex.Unlock() - // Mark the db as used in the cache. - b.openDBsCache.Put(epoch, d) + return db + } + b.openDBsCacheMutex.Unlock() - // Remove the cached db size since we will open the db. - b.dbSizes.Delete(epoch) - }) + openedCallback := func(d *database.DBInstance) { + b.openDBsCacheMutex.Lock() + defer b.openDBsCacheMutex.Unlock() - return db - })) + // Mark the db as used in the cache. + b.openDBsCache.Put(epoch, d) + + // Remove the cached db size since we will open the db. + b.dbSizes.Delete(epoch) + } + + // check if exists again, as another goroutine might have created it in parallel + db, created := b.openDBs.GetOrCreate(epoch, func() *database.DBInstance { + return database.NewDBInstance(b.dbConfig.WithDirectory(dbPathFromIndex(b.dbConfig.Directory, epoch)), openedCallback) + }) + + if created { + // Call openedCallback here instead of inside NewDBInstance to avoid a deadlock due to + // locking `b.openDBs` and `b.openDBsCacheMutex` in different order. + openedCallback(db) + } return db } From a86b55eecacb073d31aa015c30fe94d1d0cd2868 Mon Sep 17 00:00:00 2001 From: Piotr Macek <4007944+piotrm50@users.noreply.github.com> Date: Mon, 4 Dec 2023 16:01:24 +0100 Subject: [PATCH 2/2] KVStore returns error when trying to access a shutdown DBInstance --- pkg/storage/database/db_instance.go | 22 +++++-- pkg/storage/database/errors.go | 8 ++- pkg/storage/database/lockedkvstore.go | 2 +- pkg/storage/database/openablekvstore.go | 82 ++++++++++++++++++++----- 4 files changed, 89 insertions(+), 25 deletions(-) diff --git a/pkg/storage/database/db_instance.go b/pkg/storage/database/db_instance.go index 543cf1825..36eae9c11 100644 --- a/pkg/storage/database/db_instance.go +++ b/pkg/storage/database/db_instance.go @@ -34,18 +34,23 @@ func NewDBInstance(dbConfig Config, openedCallback func(d *DBInstance)) *DBInsta // lockedKVStore and the underlying openableKVStore don't handle opening and closing the underlying store by themselves, // but delegate that to the entity that constructed it. It needs to be done like that, because opening and closing the underlying store also // modifies the state of the DBInstance. Other methods (e.g. Flush) that don't modify the state of DBInstance can be handled directly. - lockableKVStore := newLockedKVStore(db, func() { + lockableKVStore := newLockedKVStore(db, func() error { if dbInstance.isClosed.Load() { storeInstanceMutex.Lock() defer storeInstanceMutex.Unlock() if dbInstance.isClosed.Load() { - dbInstance.Open() + if err := dbInstance.Open(); err != nil { + return err + } + if openedCallback != nil { openedCallback(dbInstance) } } } + + return nil }, func() { dbInstance.CloseWithoutLocking() }) @@ -78,7 +83,9 @@ func (d *DBInstance) Flush() { defer d.store.UnlockAccess() if !d.isClosed.Load() { - _ = d.store.instance().Flush() + if instance, err := d.store.instance(); err == nil { + _ = instance.Flush() + } } } @@ -109,13 +116,13 @@ func (d *DBInstance) CloseWithoutLocking() { // Open re-opens a closed DBInstance. It must only be called while holding a lock on DBInstance, // otherwise it might cause a race condition and corruption of node's state. -func (d *DBInstance) Open() { +func (d *DBInstance) Open() error { if !d.isClosed.Load() { - panic("cannot open DBInstance that is not closed") + panic(ErrDatabaseNotClosed) } if d.isShutdown.Load() { - panic("cannot open DBInstance that is shutdown") + return ErrDatabaseShutdown } d.store.Replace(lo.PanicOnErr(StoreWithDefaultSettings(d.dbConfig.Directory, false, d.dbConfig.Engine))) @@ -123,8 +130,11 @@ func (d *DBInstance) Open() { d.isClosed.Store(false) if err := d.healthTracker.MarkCorrupted(); err != nil { + // panic immediately as in this case the database state is corrupted panic(err) } + + return nil } func (d *DBInstance) LockAccess() { diff --git a/pkg/storage/database/errors.go b/pkg/storage/database/errors.go index bb2af0d7a..a471a2369 100644 --- a/pkg/storage/database/errors.go +++ b/pkg/storage/database/errors.go @@ -3,7 +3,9 @@ package database import "github.com/iotaledger/hive.go/ierrors" var ( - ErrEpochPruned = ierrors.New("epoch pruned") - ErrNoPruningNeeded = ierrors.New("no pruning needed") - ErrDatabaseFull = ierrors.New("database full") + ErrEpochPruned = ierrors.New("epoch pruned") + ErrNoPruningNeeded = ierrors.New("no pruning needed") + ErrDatabaseFull = ierrors.New("database full") + ErrDatabaseShutdown = ierrors.New("cannot open DBInstance that is shutdown") + ErrDatabaseNotClosed = ierrors.New("cannot open DBInstance that is not closed") ) diff --git a/pkg/storage/database/lockedkvstore.go b/pkg/storage/database/lockedkvstore.go index 1fbdcd36c..efa70af43 100644 --- a/pkg/storage/database/lockedkvstore.go +++ b/pkg/storage/database/lockedkvstore.go @@ -13,7 +13,7 @@ type lockedKVStore struct { accessMutex *syncutils.StarvingMutex } -func newLockedKVStore(storeInstance kvstore.KVStore, openStoreIfNecessary func(), closeStore func()) *lockedKVStore { +func newLockedKVStore(storeInstance kvstore.KVStore, openStoreIfNecessary func() error, closeStore func()) *lockedKVStore { return &lockedKVStore{ openableKVStore: newOpenableKVStore(storeInstance, openStoreIfNecessary, closeStore), accessMutex: syncutils.NewStarvingMutex(), diff --git a/pkg/storage/database/openablekvstore.go b/pkg/storage/database/openablekvstore.go index 80902c972..1e3fc6bed 100644 --- a/pkg/storage/database/openablekvstore.go +++ b/pkg/storage/database/openablekvstore.go @@ -10,14 +10,14 @@ import ( ) type openableKVStore struct { - openIfNecessary func() // openIfNecessary callback should synchronize itself and make sure that storeInstance is ready to use after. + openIfNecessary func() error // openIfNecessary callback should synchronize itself and make sure that storeInstance is ready to use after. closeStore func() storeInstance kvstore.KVStore // storeInstance is a KVStore that is holding the reference to the underlying database. parentStore *openableKVStore dbPrefix kvstore.KeyPrefix } -func newOpenableKVStore(storeInstance kvstore.KVStore, openStoreIfNecessary func(), closeStore func()) *openableKVStore { +func newOpenableKVStore(storeInstance kvstore.KVStore, openStoreIfNecessary func() error, closeStore func()) *openableKVStore { return &openableKVStore{ openIfNecessary: openStoreIfNecessary, closeStore: closeStore, @@ -36,12 +36,14 @@ func (s *openableKVStore) topParent() *openableKVStore { return current } -func (s *openableKVStore) instance() kvstore.KVStore { +func (s *openableKVStore) instance() (kvstore.KVStore, error) { parent := s.topParent() // openIfNecessary callback should synchronize itself and make sure that storeInstance is ready to use after. - parent.openIfNecessary() + if err := parent.openIfNecessary(); err != nil { + return nil, err + } - return parent.storeInstance + return parent.storeInstance, nil } func (s *openableKVStore) Replace(newKVStore kvstore.KVStore) { @@ -75,43 +77,88 @@ func (s *openableKVStore) Realm() kvstore.Realm { } func (s *openableKVStore) Iterate(prefix kvstore.KeyPrefix, kvConsumerFunc kvstore.IteratorKeyValueConsumerFunc, direction ...kvstore.IterDirection) error { - return s.instance().Iterate(s.buildKeyPrefix(prefix), func(key kvstore.Key, value kvstore.Value) bool { + instance, err := s.instance() + if err != nil { + return err + } + + return instance.Iterate(s.buildKeyPrefix(prefix), func(key kvstore.Key, value kvstore.Value) bool { return kvConsumerFunc(utils.CopyBytes(key)[len(s.dbPrefix):], value) }, direction...) } func (s *openableKVStore) IterateKeys(prefix kvstore.KeyPrefix, consumerFunc kvstore.IteratorKeyConsumerFunc, direction ...kvstore.IterDirection) error { - return s.instance().IterateKeys(s.buildKeyPrefix(prefix), func(key kvstore.Key) bool { + instance, err := s.instance() + if err != nil { + return err + } + + return instance.IterateKeys(s.buildKeyPrefix(prefix), func(key kvstore.Key) bool { return consumerFunc(utils.CopyBytes(key)[len(s.dbPrefix):]) }, direction...) } func (s *openableKVStore) Clear() error { - return s.instance().DeletePrefix(s.dbPrefix) + instance, err := s.instance() + if err != nil { + return err + } + + return instance.DeletePrefix(s.dbPrefix) } func (s *openableKVStore) Get(key kvstore.Key) (value kvstore.Value, err error) { - return s.instance().Get(byteutils.ConcatBytes(s.dbPrefix, key)) + instance, err := s.instance() + if err != nil { + return nil, err + } + + return instance.Get(byteutils.ConcatBytes(s.dbPrefix, key)) } func (s *openableKVStore) Set(key kvstore.Key, value kvstore.Value) error { - return s.instance().Set(byteutils.ConcatBytes(s.dbPrefix, key), value) + instance, err := s.instance() + if err != nil { + return err + } + + return instance.Set(byteutils.ConcatBytes(s.dbPrefix, key), value) } func (s *openableKVStore) Has(key kvstore.Key) (bool, error) { - return s.instance().Has(byteutils.ConcatBytes(s.dbPrefix, key)) + instance, err := s.instance() + if err != nil { + return false, err + } + + return instance.Has(byteutils.ConcatBytes(s.dbPrefix, key)) } func (s *openableKVStore) Delete(key kvstore.Key) error { - return s.instance().Delete(byteutils.ConcatBytes(s.dbPrefix, key)) + instance, err := s.instance() + if err != nil { + return err + } + + return instance.Delete(byteutils.ConcatBytes(s.dbPrefix, key)) } func (s *openableKVStore) DeletePrefix(prefix kvstore.KeyPrefix) error { - return s.instance().DeletePrefix(s.buildKeyPrefix(prefix)) + instance, err := s.instance() + if err != nil { + return err + } + + return instance.DeletePrefix(s.buildKeyPrefix(prefix)) } func (s *openableKVStore) Flush() error { - return s.instance().Flush() + instance, err := s.instance() + if err != nil { + return err + } + + return instance.Flush() } func (s *openableKVStore) Close() error { @@ -175,7 +222,12 @@ func (s *openableKVStoreBatchedMutations) Cancel() { } func (s *openableKVStoreBatchedMutations) Commit() error { - batched, err := s.parentStore.instance().Batched() + instance, err := s.parentStore.instance() + if err != nil { + return err + } + + batched, err := instance.Batched() if err != nil { return err }