From ac7553313025554867afbb1d623408ab7785d168 Mon Sep 17 00:00:00 2001 From: Piotr Macek <4007944+piotrm50@users.noreply.github.com> Date: Wed, 22 Nov 2023 14:08:37 +0100 Subject: [PATCH 1/2] Fix KVStore reopening of BucketManager's DBInstances. --- pkg/storage/database/openablekvstore.go | 3 +++ pkg/storage/prunable/bucket_manager.go | 34 +++++++++++++++++++++++++ pkg/storage/prunable/prunable.go | 6 ++--- 3 files changed, 40 insertions(+), 3 deletions(-) diff --git a/pkg/storage/database/openablekvstore.go b/pkg/storage/database/openablekvstore.go index 35e4cd818..9bfa6a599 100644 --- a/pkg/storage/database/openablekvstore.go +++ b/pkg/storage/database/openablekvstore.go @@ -38,6 +38,9 @@ func (s *openableKVStore) instance() kvstore.KVStore { parent := s.topParent() if parent.dbInstance.isClosed.Load() { + parent.dbInstance.Lock() + defer parent.dbInstance.Unlock() + parent.dbInstance.Open() } diff --git a/pkg/storage/prunable/bucket_manager.go b/pkg/storage/prunable/bucket_manager.go index a76a96259..b2a5b96aa 100644 --- a/pkg/storage/prunable/bucket_manager.go +++ b/pkg/storage/prunable/bucket_manager.go @@ -70,6 +70,40 @@ func (b *BucketManager) Get(epoch iotago.EpochIndex, realm kvstore.Realm) (kvsto return lo.PanicOnErr(kv.WithExtendedRealm(realm)), nil } +func (b *BucketManager) Lock() { + // Lock b.mutex so that a new DBInstance is not created + b.mutex.Lock() + // Lock b.openDBsCacheMutex so that DBInstance is not retrieved from cache + b.openDBsCacheMutex.Lock() + + // Lock all KVStores so that they can't be reopened by components that store references to them (e.g., StateDiff) + b.openDBs.ForEach(func(epoch iotago.EpochIndex, db *database.DBInstance) bool { + db.Lock() + + return true + }) +} + +func (b *BucketManager) Unlock() { + b.mutex.Unlock() + b.openDBsCacheMutex.Unlock() + + b.openDBs.ForEach(func(epoch iotago.EpochIndex, db *database.DBInstance) bool { + db.Unlock() + + return true + }) +} + +func (b *BucketManager) CloseWithoutLocking() { + b.openDBs.ForEach(func(epoch iotago.EpochIndex, db *database.DBInstance) bool { + db.CloseWithoutLocking() + b.openDBsCache.Remove(epoch) + + return true + }) +} + func (b *BucketManager) Shutdown() { b.openDBsCacheMutex.Lock() defer b.openDBsCacheMutex.Unlock() diff --git a/pkg/storage/prunable/prunable.go b/pkg/storage/prunable/prunable.go index 680c1f0e6..cb147346c 100644 --- a/pkg/storage/prunable/prunable.go +++ b/pkg/storage/prunable/prunable.go @@ -53,12 +53,12 @@ func Clone(source *Prunable, dbConfig database.Config, apiProvider iotago.APIPro source.semiPermanentDB.Lock() defer source.semiPermanentDB.Unlock() - source.prunableSlotStore.mutex.Lock() - defer source.prunableSlotStore.mutex.Unlock() + source.prunableSlotStore.Lock() + defer source.prunableSlotStore.Unlock() // Close forked prunable storage before copying its contents. source.semiPermanentDB.CloseWithoutLocking() - source.prunableSlotStore.Shutdown() + source.prunableSlotStore.CloseWithoutLocking() // Copy the storage on disk to new location. if err := copydir.Copy(source.prunableSlotStore.dbConfig.Directory, dbConfig.Directory); err != nil { From 17a4459c6f17620cf603b3c79f0353c071c58833 Mon Sep 17 00:00:00 2001 From: Piotr Macek <4007944+piotrm50@users.noreply.github.com> Date: Fri, 24 Nov 2023 10:01:27 +0100 Subject: [PATCH 2/2] Fix KVStore reopening of BucketManager's DBInstances. --- pkg/storage/database/db_instance.go | 52 ++++++++++++---- pkg/storage/database/lockedkvstore.go | 79 ++++++++++++------------- pkg/storage/database/openablekvstore.go | 33 +++++------ pkg/storage/database/utils.go | 9 --- pkg/storage/permanent/permanent.go | 9 ++- pkg/storage/prunable/bucket_manager.go | 32 +++++----- pkg/storage/prunable/prunable.go | 11 ++-- 7 files changed, 118 insertions(+), 107 deletions(-) delete mode 100644 pkg/storage/database/utils.go diff --git a/pkg/storage/database/db_instance.go b/pkg/storage/database/db_instance.go index 63ef9a41e..3421c464c 100644 --- a/pkg/storage/database/db_instance.go +++ b/pkg/storage/database/db_instance.go @@ -6,6 +6,7 @@ import ( "github.com/iotaledger/hive.go/ierrors" "github.com/iotaledger/hive.go/kvstore" "github.com/iotaledger/hive.go/lo" + "github.com/iotaledger/hive.go/runtime/syncutils" ) type DBInstance struct { @@ -16,7 +17,7 @@ type DBInstance struct { isShutdown atomic.Bool } -func NewDBInstance(dbConfig Config) *DBInstance { +func NewDBInstance(dbConfig Config, openedCallback func(d *DBInstance)) *DBInstance { db, err := StoreWithDefaultSettings(dbConfig.Directory, true, dbConfig.Engine) if err != nil { panic(err) @@ -26,7 +27,28 @@ func NewDBInstance(dbConfig Config) *DBInstance { dbConfig: dbConfig, } - lockableKVStore := newLockedKVStore(db, dbInstance) + // Create a storeInstanceMutex that will be used to lock access to Open() method. + // This allows us to avoid contention upon write-locking access to the KVStore upon all operations. + storeInstanceMutex := new(syncutils.Mutex) + + // lockedKVStore and the underlying openableKVStore don't handle opening and closing the underlying store by themselves, + // but delegate that to the entity that constructed it. It needs to be done like that, because opening and closing the underlying store also + // modifies the state of the DBInstance. Other methods (e.g. Flush) that don't modify the state of DBInstance can be handled directly. + lockableKVStore := newLockedKVStore(db, func() { + if dbInstance.isClosed.Load() { + storeInstanceMutex.Lock() + defer storeInstanceMutex.Unlock() + + if dbInstance.isClosed.Load() { + dbInstance.Open() + if openedCallback != nil { + openedCallback(dbInstance) + } + } + } + }, func() { + dbInstance.CloseWithoutLocking() + }) dbInstance.store = lockableKVStore @@ -42,6 +64,10 @@ func NewDBInstance(dbConfig Config) *DBInstance { dbInstance.healthTracker = storeHealthTracker + if openedCallback != nil { + openedCallback(dbInstance) + } + return dbInstance } @@ -52,8 +78,8 @@ func (d *DBInstance) Shutdown() { } func (d *DBInstance) Flush() { - d.store.Lock() - defer d.store.Unlock() + d.store.LockAccess() + defer d.store.UnlockAccess() if !d.isClosed.Load() { _ = d.store.instance().Flush() @@ -61,8 +87,8 @@ func (d *DBInstance) Flush() { } func (d *DBInstance) Close() { - d.store.Lock() - defer d.store.Unlock() + d.store.LockAccess() + defer d.store.UnlockAccess() d.CloseWithoutLocking() } @@ -73,7 +99,11 @@ func (d *DBInstance) CloseWithoutLocking() { panic(err) } - if err := FlushAndClose(d.store); err != nil { + if err := d.store.topParent().storeInstance.Flush(); err != nil { + panic(err) + } + + if err := d.store.topParent().storeInstance.Close(); err != nil { panic(err) } @@ -101,12 +131,12 @@ func (d *DBInstance) Open() { } } -func (d *DBInstance) Lock() { - d.store.Lock() +func (d *DBInstance) LockAccess() { + d.store.LockAccess() } -func (d *DBInstance) Unlock() { - d.store.Unlock() +func (d *DBInstance) UnlockAccess() { + d.store.UnlockAccess() } func (d *DBInstance) KVStore() kvstore.KVStore { diff --git a/pkg/storage/database/lockedkvstore.go b/pkg/storage/database/lockedkvstore.go index c76601853..61f14e6d7 100644 --- a/pkg/storage/database/lockedkvstore.go +++ b/pkg/storage/database/lockedkvstore.go @@ -2,7 +2,6 @@ package database import ( "github.com/iotaledger/hive.go/ds/types" - "github.com/iotaledger/hive.go/ierrors" "github.com/iotaledger/hive.go/kvstore" "github.com/iotaledger/hive.go/runtime/syncutils" "github.com/iotaledger/hive.go/serializer/v2/byteutils" @@ -11,27 +10,27 @@ import ( type lockedKVStore struct { *openableKVStore - instanceMutex *syncutils.RWMutex + accessMutex *syncutils.RWMutex } -func newLockedKVStore(storeInstance kvstore.KVStore, dbInstance *DBInstance) *lockedKVStore { +func newLockedKVStore(storeInstance kvstore.KVStore, openStoreIfNecessary func(), closeStore func()) *lockedKVStore { return &lockedKVStore{ - openableKVStore: newOpenableKVStore(storeInstance, dbInstance), - instanceMutex: new(syncutils.RWMutex), + openableKVStore: newOpenableKVStore(storeInstance, openStoreIfNecessary, closeStore), + accessMutex: new(syncutils.RWMutex), } } -func (s *lockedKVStore) Lock() { - s.instanceMutex.Lock() +func (s *lockedKVStore) LockAccess() { + s.accessMutex.Lock() } -func (s *lockedKVStore) Unlock() { - s.instanceMutex.Unlock() +func (s *lockedKVStore) UnlockAccess() { + s.accessMutex.Unlock() } func (s *lockedKVStore) WithRealm(realm kvstore.Realm) (kvstore.KVStore, error) { - s.instanceMutex.RLock() - defer s.instanceMutex.RUnlock() + s.accessMutex.RLock() + defer s.accessMutex.RUnlock() return s.withRealm(realm) } @@ -44,76 +43,76 @@ func (s *lockedKVStore) withRealm(realm kvstore.Realm) (kvstore.KVStore, error) dbPrefix: realm, }, - instanceMutex: s.instanceMutex, + accessMutex: s.accessMutex, }, nil } func (s *lockedKVStore) WithExtendedRealm(realm kvstore.Realm) (kvstore.KVStore, error) { - s.instanceMutex.RLock() - defer s.instanceMutex.RUnlock() + s.accessMutex.RLock() + defer s.accessMutex.RUnlock() return s.withRealm(s.buildKeyPrefix(realm)) } func (s *lockedKVStore) Iterate(prefix kvstore.KeyPrefix, kvConsumerFunc kvstore.IteratorKeyValueConsumerFunc, direction ...kvstore.IterDirection) error { - s.instanceMutex.RLock() - defer s.instanceMutex.RUnlock() + s.accessMutex.RLock() + defer s.accessMutex.RUnlock() return s.openableKVStore.Iterate(prefix, kvConsumerFunc, direction...) } func (s *lockedKVStore) IterateKeys(prefix kvstore.KeyPrefix, consumerFunc kvstore.IteratorKeyConsumerFunc, direction ...kvstore.IterDirection) error { - s.instanceMutex.RLock() - defer s.instanceMutex.RUnlock() + s.accessMutex.RLock() + defer s.accessMutex.RUnlock() return s.openableKVStore.IterateKeys(prefix, consumerFunc, direction...) } func (s *lockedKVStore) Clear() error { - s.instanceMutex.RLock() - defer s.instanceMutex.RUnlock() + s.accessMutex.RLock() + defer s.accessMutex.RUnlock() return s.openableKVStore.Clear() } func (s *lockedKVStore) Get(key kvstore.Key) (value kvstore.Value, err error) { - s.instanceMutex.RLock() - defer s.instanceMutex.RUnlock() + s.accessMutex.RLock() + defer s.accessMutex.RUnlock() return s.openableKVStore.Get(key) } func (s *lockedKVStore) Set(key kvstore.Key, value kvstore.Value) error { - s.instanceMutex.RLock() - defer s.instanceMutex.RUnlock() + s.accessMutex.RLock() + defer s.accessMutex.RUnlock() return s.openableKVStore.Set(key, value) } func (s *lockedKVStore) Has(key kvstore.Key) (bool, error) { - s.instanceMutex.RLock() - defer s.instanceMutex.RUnlock() + s.accessMutex.RLock() + defer s.accessMutex.RUnlock() return s.openableKVStore.Has(key) } func (s *lockedKVStore) Delete(key kvstore.Key) error { - s.instanceMutex.RLock() - defer s.instanceMutex.RUnlock() + s.accessMutex.RLock() + defer s.accessMutex.RUnlock() return s.openableKVStore.Delete(key) } func (s *lockedKVStore) DeletePrefix(prefix kvstore.KeyPrefix) error { - s.instanceMutex.RLock() - defer s.instanceMutex.RUnlock() + s.accessMutex.RLock() + defer s.accessMutex.RUnlock() return s.openableKVStore.DeletePrefix(prefix) } func (s *lockedKVStore) Flush() error { - s.instanceMutex.RLock() - defer s.instanceMutex.RUnlock() + s.accessMutex.RLock() + defer s.accessMutex.RUnlock() return s.FlushWithoutLocking() } @@ -123,12 +122,8 @@ func (s *lockedKVStore) FlushWithoutLocking() error { } func (s *lockedKVStore) Close() error { - s.instanceMutex.RLock() - defer s.instanceMutex.RUnlock() - - if err := s.FlushWithoutLocking(); err != nil { - return ierrors.Wrap(err, "failed to flush database") - } + s.accessMutex.RLock() + defer s.accessMutex.RUnlock() return s.CloseWithoutLocking() } @@ -138,8 +133,8 @@ func (s *lockedKVStore) CloseWithoutLocking() error { } func (s *lockedKVStore) Batched() (kvstore.BatchedMutations, error) { - s.instanceMutex.RLock() - defer s.instanceMutex.RUnlock() + s.accessMutex.RLock() + defer s.accessMutex.RUnlock() return &syncedBatchedMutations{ openableKVStoreBatchedMutations: &openableKVStoreBatchedMutations{ @@ -165,8 +160,8 @@ type syncedBatchedMutations struct { } func (s *syncedBatchedMutations) Commit() error { - s.parentStore.instanceMutex.RLock() - defer s.parentStore.instanceMutex.RUnlock() + s.parentStore.accessMutex.RLock() + defer s.parentStore.accessMutex.RUnlock() return s.openableKVStoreBatchedMutations.Commit() } diff --git a/pkg/storage/database/openablekvstore.go b/pkg/storage/database/openablekvstore.go index 9bfa6a599..80902c972 100644 --- a/pkg/storage/database/openablekvstore.go +++ b/pkg/storage/database/openablekvstore.go @@ -10,18 +10,20 @@ import ( ) type openableKVStore struct { - dbInstance *DBInstance - storeInstance kvstore.KVStore // KVStore that is used to access the DB instance - parentStore *openableKVStore - dbPrefix kvstore.KeyPrefix + openIfNecessary func() // openIfNecessary callback should synchronize itself and make sure that storeInstance is ready to use after. + closeStore func() + storeInstance kvstore.KVStore // storeInstance is a KVStore that is holding the reference to the underlying database. + parentStore *openableKVStore + dbPrefix kvstore.KeyPrefix } -func newOpenableKVStore(storeInstance kvstore.KVStore, dbInstance *DBInstance) *openableKVStore { +func newOpenableKVStore(storeInstance kvstore.KVStore, openStoreIfNecessary func(), closeStore func()) *openableKVStore { return &openableKVStore{ - dbInstance: dbInstance, - storeInstance: storeInstance, - parentStore: nil, - dbPrefix: kvstore.EmptyPrefix, + openIfNecessary: openStoreIfNecessary, + closeStore: closeStore, + storeInstance: storeInstance, + parentStore: nil, + dbPrefix: kvstore.EmptyPrefix, } } @@ -36,13 +38,8 @@ func (s *openableKVStore) topParent() *openableKVStore { func (s *openableKVStore) instance() kvstore.KVStore { parent := s.topParent() - - if parent.dbInstance.isClosed.Load() { - parent.dbInstance.Lock() - defer parent.dbInstance.Unlock() - - parent.dbInstance.Open() - } + // openIfNecessary callback should synchronize itself and make sure that storeInstance is ready to use after. + parent.openIfNecessary() return parent.storeInstance } @@ -63,7 +60,6 @@ func (s *openableKVStore) WithRealm(realm kvstore.Realm) (kvstore.KVStore, error func (s *openableKVStore) withRealm(realm kvstore.Realm) (kvstore.KVStore, error) { return &openableKVStore{ - dbInstance: nil, storeInstance: nil, parentStore: s, dbPrefix: realm, @@ -119,7 +115,8 @@ func (s *openableKVStore) Flush() error { } func (s *openableKVStore) Close() error { - s.topParent().dbInstance.CloseWithoutLocking() + s.topParent().closeStore() + return nil } diff --git a/pkg/storage/database/utils.go b/pkg/storage/database/utils.go deleted file mode 100644 index eaded98c8..000000000 --- a/pkg/storage/database/utils.go +++ /dev/null @@ -1,9 +0,0 @@ -package database - -func FlushAndClose(store *lockedKVStore) error { - if err := store.instance().Flush(); err != nil { - return err - } - - return store.instance().Close() -} diff --git a/pkg/storage/permanent/permanent.go b/pkg/storage/permanent/permanent.go index fd576d9c6..886ada1cb 100644 --- a/pkg/storage/permanent/permanent.go +++ b/pkg/storage/permanent/permanent.go @@ -43,7 +43,8 @@ func New(dbConfig database.Config, errorHandler func(error), opts ...options.Opt errorHandler: errorHandler, dbConfig: dbConfig, }, opts, func(p *Permanent) { - p.store = database.NewDBInstance(p.dbConfig) + // openedCallback is nil because we don't need to do anything upon reopening + p.store = database.NewDBInstance(p.dbConfig, nil) p.settings = NewSettings(lo.PanicOnErr(p.store.KVStore().WithExtendedRealm(kvstore.Realm{settingsPrefix})), p.optsEpochBasedProvider...) p.commitments = NewCommitments(lo.PanicOnErr(p.store.KVStore().WithExtendedRealm(kvstore.Realm{commitmentsPrefix})), p.settings.APIProvider()) p.utxoLedger = utxoledger.New(lo.PanicOnErr(p.store.KVStore().WithExtendedRealm(kvstore.Realm{ledgerPrefix})), p.settings.APIProvider()) @@ -53,8 +54,8 @@ func New(dbConfig database.Config, errorHandler func(error), opts ...options.Opt } func Clone(source *Permanent, dbConfig database.Config, errorHandler func(error), opts ...options.Option[Permanent]) (*Permanent, error) { - source.store.Lock() - defer source.store.Unlock() + source.store.LockAccess() + defer source.store.UnlockAccess() source.store.CloseWithoutLocking() @@ -62,8 +63,6 @@ func Clone(source *Permanent, dbConfig database.Config, errorHandler func(error) return nil, ierrors.Wrap(err, "failed to copy permanent storage directory to new storage path") } - source.store.Open() - return New(dbConfig, errorHandler, opts...), nil } diff --git a/pkg/storage/prunable/bucket_manager.go b/pkg/storage/prunable/bucket_manager.go index b2a5b96aa..7a0a08b9e 100644 --- a/pkg/storage/prunable/bucket_manager.go +++ b/pkg/storage/prunable/bucket_manager.go @@ -76,23 +76,23 @@ func (b *BucketManager) Lock() { // Lock b.openDBsCacheMutex so that DBInstance is not retrieved from cache b.openDBsCacheMutex.Lock() - // Lock all KVStores so that they can't be reopened by components that store references to them (e.g., StateDiff) + // Lock access to all KVStores so that they can't be reopened by components that store references to them (e.g., StateDiff) b.openDBs.ForEach(func(epoch iotago.EpochIndex, db *database.DBInstance) bool { - db.Lock() + db.LockAccess() return true }) } func (b *BucketManager) Unlock() { - b.mutex.Unlock() - b.openDBsCacheMutex.Unlock() - b.openDBs.ForEach(func(epoch iotago.EpochIndex, db *database.DBInstance) bool { - db.Unlock() + db.UnlockAccess() return true }) + + b.openDBsCacheMutex.Unlock() + b.mutex.Unlock() } func (b *BucketManager) CloseWithoutLocking() { @@ -205,22 +205,22 @@ func (b *BucketManager) getDBInstance(epoch iotago.EpochIndex) *database.DBInsta b.mutex.RLock() defer b.mutex.RUnlock() - b.openDBsCacheMutex.Lock() - defer b.openDBsCacheMutex.Unlock() - - // check if exists again, as other goroutine might have created it in parallel + // check if exists again, as another goroutine might have created it in parallel db := lo.Return1(b.openDBs.GetOrCreate(epoch, func() *database.DBInstance { - db := database.NewDBInstance(b.dbConfig.WithDirectory(dbPathFromIndex(b.dbConfig.Directory, epoch))) + db := database.NewDBInstance(b.dbConfig.WithDirectory(dbPathFromIndex(b.dbConfig.Directory, epoch)), func(d *database.DBInstance) { + b.openDBsCacheMutex.Lock() + defer b.openDBsCacheMutex.Unlock() - // Remove the cached db size since we will open the db - b.dbSizes.Delete(epoch) + // Mark the db as used in the cache. + b.openDBsCache.Put(epoch, d) + + // Remove the cached db size since we will open the db. + b.dbSizes.Delete(epoch) + }) return db })) - // Mark the db as used in the cache - b.openDBsCache.Put(epoch, db) - return db } diff --git a/pkg/storage/prunable/prunable.go b/pkg/storage/prunable/prunable.go index cb147346c..595a1c573 100644 --- a/pkg/storage/prunable/prunable.go +++ b/pkg/storage/prunable/prunable.go @@ -32,7 +32,8 @@ type Prunable struct { func New(dbConfig database.Config, apiProvider iotago.APIProvider, errorHandler func(error), opts ...options.Option[BucketManager]) *Prunable { dir := utils.NewDirectory(dbConfig.Directory, true) semiPermanentDBConfig := dbConfig.WithDirectory(dir.PathWithCreate("semipermanent")) - semiPermanentDB := database.NewDBInstance(semiPermanentDBConfig) + // openedCallback is nil because we don't need to do anything when reopening the store. + semiPermanentDB := database.NewDBInstance(semiPermanentDBConfig, nil) return &Prunable{ apiProvider: apiProvider, @@ -50,13 +51,13 @@ func New(dbConfig database.Config, apiProvider iotago.APIProvider, errorHandler func Clone(source *Prunable, dbConfig database.Config, apiProvider iotago.APIProvider, errorHandler func(error), opts ...options.Option[BucketManager]) (*Prunable, error) { // Lock semi-permanent DB and prunable slot store so that nobody can try to use or open them while cloning. - source.semiPermanentDB.Lock() - defer source.semiPermanentDB.Unlock() + source.semiPermanentDB.LockAccess() + defer source.semiPermanentDB.UnlockAccess() source.prunableSlotStore.Lock() defer source.prunableSlotStore.Unlock() - // Close forked prunable storage before copying its contents. + // Close forked prunable storage before copying its contents. All necessary locks are already acquired. source.semiPermanentDB.CloseWithoutLocking() source.prunableSlotStore.CloseWithoutLocking() @@ -65,8 +66,6 @@ func Clone(source *Prunable, dbConfig database.Config, apiProvider iotago.APIPro return nil, ierrors.Wrap(err, "failed to copy prunable storage directory to new storage path") } - source.semiPermanentDB.Open() - return New(dbConfig, apiProvider, errorHandler, opts...), nil }