Skip to content

Commit

Permalink
Merge pull request #545 from iotaledger/fix/bucket-storage-reopening
Browse files Browse the repository at this point in the history
Fix KVStore reopening of BucketManager's DBInstances.
  • Loading branch information
alexsporn authored Nov 24, 2023
2 parents fb7c74f + 57dfde1 commit 390f13f
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 101 deletions.
52 changes: 41 additions & 11 deletions pkg/storage/database/db_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/iotaledger/hive.go/ierrors"
"github.com/iotaledger/hive.go/kvstore"
"github.com/iotaledger/hive.go/lo"
"github.com/iotaledger/hive.go/runtime/syncutils"
)

type DBInstance struct {
Expand All @@ -16,7 +17,7 @@ type DBInstance struct {
isShutdown atomic.Bool
}

func NewDBInstance(dbConfig Config) *DBInstance {
func NewDBInstance(dbConfig Config, openedCallback func(d *DBInstance)) *DBInstance {
db, err := StoreWithDefaultSettings(dbConfig.Directory, true, dbConfig.Engine)
if err != nil {
panic(err)
Expand All @@ -26,7 +27,28 @@ func NewDBInstance(dbConfig Config) *DBInstance {
dbConfig: dbConfig,
}

lockableKVStore := newLockedKVStore(db, dbInstance)
// Create a storeInstanceMutex that will be used to lock access to Open() method.
// This allows us to avoid contention upon write-locking access to the KVStore upon all operations.
storeInstanceMutex := new(syncutils.Mutex)

// lockedKVStore and the underlying openableKVStore don't handle opening and closing the underlying store by themselves,
// but delegate that to the entity that constructed it. It needs to be done like that, because opening and closing the underlying store also
// modifies the state of the DBInstance. Other methods (e.g. Flush) that don't modify the state of DBInstance can be handled directly.
lockableKVStore := newLockedKVStore(db, func() {
if dbInstance.isClosed.Load() {
storeInstanceMutex.Lock()
defer storeInstanceMutex.Unlock()

if dbInstance.isClosed.Load() {
dbInstance.Open()
if openedCallback != nil {
openedCallback(dbInstance)
}
}
}
}, func() {
dbInstance.CloseWithoutLocking()
})

dbInstance.store = lockableKVStore

Expand All @@ -42,6 +64,10 @@ func NewDBInstance(dbConfig Config) *DBInstance {

dbInstance.healthTracker = storeHealthTracker

if openedCallback != nil {
openedCallback(dbInstance)
}

return dbInstance
}

Expand All @@ -52,17 +78,17 @@ func (d *DBInstance) Shutdown() {
}

func (d *DBInstance) Flush() {
d.store.Lock()
defer d.store.Unlock()
d.store.LockAccess()
defer d.store.UnlockAccess()

if !d.isClosed.Load() {
_ = d.store.instance().Flush()
}
}

func (d *DBInstance) Close() {
d.store.Lock()
defer d.store.Unlock()
d.store.LockAccess()
defer d.store.UnlockAccess()

d.CloseWithoutLocking()
}
Expand All @@ -73,7 +99,11 @@ func (d *DBInstance) CloseWithoutLocking() {
panic(err)
}

if err := FlushAndClose(d.store); err != nil {
if err := d.store.topParent().storeInstance.Flush(); err != nil {
panic(err)
}

if err := d.store.topParent().storeInstance.Close(); err != nil {
panic(err)
}

Expand Down Expand Up @@ -101,12 +131,12 @@ func (d *DBInstance) Open() {
}
}

func (d *DBInstance) Lock() {
d.store.Lock()
func (d *DBInstance) LockAccess() {
d.store.LockAccess()
}

func (d *DBInstance) Unlock() {
d.store.Unlock()
func (d *DBInstance) UnlockAccess() {
d.store.UnlockAccess()
}

func (d *DBInstance) KVStore() kvstore.KVStore {
Expand Down
79 changes: 37 additions & 42 deletions pkg/storage/database/lockedkvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package database

import (
"github.com/iotaledger/hive.go/ds/types"
"github.com/iotaledger/hive.go/ierrors"
"github.com/iotaledger/hive.go/kvstore"
"github.com/iotaledger/hive.go/runtime/syncutils"
"github.com/iotaledger/hive.go/serializer/v2/byteutils"
Expand All @@ -11,27 +10,27 @@ import (
type lockedKVStore struct {
*openableKVStore

instanceMutex *syncutils.RWMutex
accessMutex *syncutils.RWMutex
}

func newLockedKVStore(storeInstance kvstore.KVStore, dbInstance *DBInstance) *lockedKVStore {
func newLockedKVStore(storeInstance kvstore.KVStore, openStoreIfNecessary func(), closeStore func()) *lockedKVStore {
return &lockedKVStore{
openableKVStore: newOpenableKVStore(storeInstance, dbInstance),
instanceMutex: new(syncutils.RWMutex),
openableKVStore: newOpenableKVStore(storeInstance, openStoreIfNecessary, closeStore),
accessMutex: new(syncutils.RWMutex),
}
}

func (s *lockedKVStore) Lock() {
s.instanceMutex.Lock()
func (s *lockedKVStore) LockAccess() {
s.accessMutex.Lock()
}

func (s *lockedKVStore) Unlock() {
s.instanceMutex.Unlock()
func (s *lockedKVStore) UnlockAccess() {
s.accessMutex.Unlock()
}

func (s *lockedKVStore) WithRealm(realm kvstore.Realm) (kvstore.KVStore, error) {
s.instanceMutex.RLock()
defer s.instanceMutex.RUnlock()
s.accessMutex.RLock()
defer s.accessMutex.RUnlock()

return s.withRealm(realm)
}
Expand All @@ -44,76 +43,76 @@ func (s *lockedKVStore) withRealm(realm kvstore.Realm) (kvstore.KVStore, error)
dbPrefix: realm,
},

instanceMutex: s.instanceMutex,
accessMutex: s.accessMutex,
}, nil
}

func (s *lockedKVStore) WithExtendedRealm(realm kvstore.Realm) (kvstore.KVStore, error) {
s.instanceMutex.RLock()
defer s.instanceMutex.RUnlock()
s.accessMutex.RLock()
defer s.accessMutex.RUnlock()

return s.withRealm(s.buildKeyPrefix(realm))
}

func (s *lockedKVStore) Iterate(prefix kvstore.KeyPrefix, kvConsumerFunc kvstore.IteratorKeyValueConsumerFunc, direction ...kvstore.IterDirection) error {
s.instanceMutex.RLock()
defer s.instanceMutex.RUnlock()
s.accessMutex.RLock()
defer s.accessMutex.RUnlock()

return s.openableKVStore.Iterate(prefix, kvConsumerFunc, direction...)
}

func (s *lockedKVStore) IterateKeys(prefix kvstore.KeyPrefix, consumerFunc kvstore.IteratorKeyConsumerFunc, direction ...kvstore.IterDirection) error {
s.instanceMutex.RLock()
defer s.instanceMutex.RUnlock()
s.accessMutex.RLock()
defer s.accessMutex.RUnlock()

return s.openableKVStore.IterateKeys(prefix, consumerFunc, direction...)
}

func (s *lockedKVStore) Clear() error {
s.instanceMutex.RLock()
defer s.instanceMutex.RUnlock()
s.accessMutex.RLock()
defer s.accessMutex.RUnlock()

return s.openableKVStore.Clear()
}

func (s *lockedKVStore) Get(key kvstore.Key) (value kvstore.Value, err error) {
s.instanceMutex.RLock()
defer s.instanceMutex.RUnlock()
s.accessMutex.RLock()
defer s.accessMutex.RUnlock()

return s.openableKVStore.Get(key)
}

func (s *lockedKVStore) Set(key kvstore.Key, value kvstore.Value) error {
s.instanceMutex.RLock()
defer s.instanceMutex.RUnlock()
s.accessMutex.RLock()
defer s.accessMutex.RUnlock()

return s.openableKVStore.Set(key, value)
}

func (s *lockedKVStore) Has(key kvstore.Key) (bool, error) {
s.instanceMutex.RLock()
defer s.instanceMutex.RUnlock()
s.accessMutex.RLock()
defer s.accessMutex.RUnlock()

return s.openableKVStore.Has(key)
}

func (s *lockedKVStore) Delete(key kvstore.Key) error {
s.instanceMutex.RLock()
defer s.instanceMutex.RUnlock()
s.accessMutex.RLock()
defer s.accessMutex.RUnlock()

return s.openableKVStore.Delete(key)
}

func (s *lockedKVStore) DeletePrefix(prefix kvstore.KeyPrefix) error {
s.instanceMutex.RLock()
defer s.instanceMutex.RUnlock()
s.accessMutex.RLock()
defer s.accessMutex.RUnlock()

return s.openableKVStore.DeletePrefix(prefix)
}

func (s *lockedKVStore) Flush() error {
s.instanceMutex.RLock()
defer s.instanceMutex.RUnlock()
s.accessMutex.RLock()
defer s.accessMutex.RUnlock()

return s.FlushWithoutLocking()
}
Expand All @@ -123,12 +122,8 @@ func (s *lockedKVStore) FlushWithoutLocking() error {
}

func (s *lockedKVStore) Close() error {
s.instanceMutex.RLock()
defer s.instanceMutex.RUnlock()

if err := s.FlushWithoutLocking(); err != nil {
return ierrors.Wrap(err, "failed to flush database")
}
s.accessMutex.RLock()
defer s.accessMutex.RUnlock()

return s.CloseWithoutLocking()
}
Expand All @@ -138,8 +133,8 @@ func (s *lockedKVStore) CloseWithoutLocking() error {
}

func (s *lockedKVStore) Batched() (kvstore.BatchedMutations, error) {
s.instanceMutex.RLock()
defer s.instanceMutex.RUnlock()
s.accessMutex.RLock()
defer s.accessMutex.RUnlock()

return &syncedBatchedMutations{
openableKVStoreBatchedMutations: &openableKVStoreBatchedMutations{
Expand All @@ -165,8 +160,8 @@ type syncedBatchedMutations struct {
}

func (s *syncedBatchedMutations) Commit() error {
s.parentStore.instanceMutex.RLock()
defer s.parentStore.instanceMutex.RUnlock()
s.parentStore.accessMutex.RLock()
defer s.parentStore.accessMutex.RUnlock()

return s.openableKVStoreBatchedMutations.Commit()
}
30 changes: 15 additions & 15 deletions pkg/storage/database/openablekvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,20 @@ import (
)

type openableKVStore struct {
dbInstance *DBInstance
storeInstance kvstore.KVStore // KVStore that is used to access the DB instance
parentStore *openableKVStore
dbPrefix kvstore.KeyPrefix
openIfNecessary func() // openIfNecessary callback should synchronize itself and make sure that storeInstance is ready to use after.
closeStore func()
storeInstance kvstore.KVStore // storeInstance is a KVStore that is holding the reference to the underlying database.
parentStore *openableKVStore
dbPrefix kvstore.KeyPrefix
}

func newOpenableKVStore(storeInstance kvstore.KVStore, dbInstance *DBInstance) *openableKVStore {
func newOpenableKVStore(storeInstance kvstore.KVStore, openStoreIfNecessary func(), closeStore func()) *openableKVStore {
return &openableKVStore{
dbInstance: dbInstance,
storeInstance: storeInstance,
parentStore: nil,
dbPrefix: kvstore.EmptyPrefix,
openIfNecessary: openStoreIfNecessary,
closeStore: closeStore,
storeInstance: storeInstance,
parentStore: nil,
dbPrefix: kvstore.EmptyPrefix,
}
}

Expand All @@ -36,10 +38,8 @@ func (s *openableKVStore) topParent() *openableKVStore {

func (s *openableKVStore) instance() kvstore.KVStore {
parent := s.topParent()

if parent.dbInstance.isClosed.Load() {
parent.dbInstance.Open()
}
// openIfNecessary callback should synchronize itself and make sure that storeInstance is ready to use after.
parent.openIfNecessary()

return parent.storeInstance
}
Expand All @@ -60,7 +60,6 @@ func (s *openableKVStore) WithRealm(realm kvstore.Realm) (kvstore.KVStore, error

func (s *openableKVStore) withRealm(realm kvstore.Realm) (kvstore.KVStore, error) {
return &openableKVStore{
dbInstance: nil,
storeInstance: nil,
parentStore: s,
dbPrefix: realm,
Expand Down Expand Up @@ -116,7 +115,8 @@ func (s *openableKVStore) Flush() error {
}

func (s *openableKVStore) Close() error {
s.topParent().dbInstance.CloseWithoutLocking()
s.topParent().closeStore()

return nil
}

Expand Down
9 changes: 0 additions & 9 deletions pkg/storage/database/utils.go

This file was deleted.

9 changes: 4 additions & 5 deletions pkg/storage/permanent/permanent.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ func New(dbConfig database.Config, errorHandler func(error), opts ...options.Opt
errorHandler: errorHandler,
dbConfig: dbConfig,
}, opts, func(p *Permanent) {
p.store = database.NewDBInstance(p.dbConfig)
// openedCallback is nil because we don't need to do anything upon reopening
p.store = database.NewDBInstance(p.dbConfig, nil)
p.settings = NewSettings(lo.PanicOnErr(p.store.KVStore().WithExtendedRealm(kvstore.Realm{settingsPrefix})), p.optsEpochBasedProvider...)
p.commitments = NewCommitments(lo.PanicOnErr(p.store.KVStore().WithExtendedRealm(kvstore.Realm{commitmentsPrefix})), p.settings.APIProvider())
p.utxoLedger = utxoledger.New(lo.PanicOnErr(p.store.KVStore().WithExtendedRealm(kvstore.Realm{ledgerPrefix})), p.settings.APIProvider())
Expand All @@ -52,17 +53,15 @@ func New(dbConfig database.Config, errorHandler func(error), opts ...options.Opt
}

func Clone(source *Permanent, dbConfig database.Config, errorHandler func(error), opts ...options.Option[Permanent]) (*Permanent, error) {
source.store.Lock()
defer source.store.Unlock()
source.store.LockAccess()
defer source.store.UnlockAccess()

source.store.CloseWithoutLocking()

if err := copydir.Copy(source.dbConfig.Directory, dbConfig.Directory); err != nil {
return nil, ierrors.Wrap(err, "failed to copy permanent storage directory to new storage path")
}

source.store.Open()

return New(dbConfig, errorHandler, opts...), nil
}

Expand Down
Loading

0 comments on commit 390f13f

Please sign in to comment.