From 3e12dc4691e4d8102fa11499d699b3adbb12fc7d Mon Sep 17 00:00:00 2001 From: Andrea V <1577639+karimodm@users.noreply.github.com> Date: Wed, 15 Nov 2023 12:22:02 +0100 Subject: [PATCH 01/13] Fix attestation collection count type --- pkg/protocol/engine/attestation/slotattestation/manager.go | 6 ++++-- pkg/protocol/engine/attestation/slotattestation/snapshot.go | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/pkg/protocol/engine/attestation/slotattestation/manager.go b/pkg/protocol/engine/attestation/slotattestation/manager.go index ad73fde40..7cf4d45bf 100644 --- a/pkg/protocol/engine/attestation/slotattestation/manager.go +++ b/pkg/protocol/engine/attestation/slotattestation/manager.go @@ -329,8 +329,10 @@ func (m *Manager) Reset() { } func (m *Manager) computeAttestationCommitmentOffset(slot iotago.SlotIndex) (cutoffSlot iotago.SlotIndex, isValid bool) { - if slot < m.apiProvider.APIForSlot(slot).ProtocolParameters().MaxCommittableAge() { - return 0, false + protocolParams := m.apiProvider.APIForSlot(slot).ProtocolParameters() + + if slot < protocolParams.GenesisSlot()+protocolParams.MaxCommittableAge() { + return protocolParams.GenesisSlot(), false } return slot - m.apiProvider.APIForSlot(slot).ProtocolParameters().MaxCommittableAge(), true diff --git a/pkg/protocol/engine/attestation/slotattestation/snapshot.go b/pkg/protocol/engine/attestation/slotattestation/snapshot.go index 0b4b3c14e..5823ead14 100644 --- a/pkg/protocol/engine/attestation/slotattestation/snapshot.go +++ b/pkg/protocol/engine/attestation/slotattestation/snapshot.go @@ -51,7 +51,7 @@ func (m *Manager) Export(writer io.WriteSeeker, targetSlot iotago.SlotIndex) err } if _, isValid := m.computeAttestationCommitmentOffset(targetSlot); !isValid { - if err := stream.Write(writer, uint64(0)); err != nil { + if err := stream.Write(writer, uint32(0)); err != nil { return ierrors.Wrap(err, "failed to write 0 attestation count") } From 85d1d62a792a4be99497b512e5144a46377df54b Mon Sep 17 00:00:00 2001 From: Andrea V <1577639+karimodm@users.noreply.github.com> Date: Wed, 15 Nov 2023 12:22:15 +0100 Subject: [PATCH 02/13] Spice things up by having Docker genesisSlot other than 0 --- tools/genesis-snapshot/presets/presets.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/genesis-snapshot/presets/presets.go b/tools/genesis-snapshot/presets/presets.go index 5c81cdaa7..f44f7a0a9 100644 --- a/tools/genesis-snapshot/presets/presets.go +++ b/tools/genesis-snapshot/presets/presets.go @@ -133,7 +133,7 @@ var Docker = []options.Option[snapshotcreator.Options]{ iotago.NewV3ProtocolParameters( iotago.WithNetworkOptions("docker", "rms"), iotago.WithSupplyOptions(4_600_000_000_000_000, 1, 1, 10, 100, 100, 100), - iotago.WithTimeProviderOptions(0, time.Now().Unix(), 10, 13), + iotago.WithTimeProviderOptions(5, time.Now().Unix(), 10, 13), iotago.WithLivenessOptions(30, 30, 7, 14, 30), // increase/decrease threshold = fraction * slotDurationInSeconds * schedulerRate iotago.WithCongestionControlOptions(500, 500, 500, 800000, 500000, 100000, 1000, 100), From 02f27647f61b4c68fd0dbea5217563ef0ae43191 Mon Sep 17 00:00:00 2001 From: Andrea V <1577639+karimodm@users.noreply.github.com> Date: Wed, 15 Nov 2023 12:33:39 +0100 Subject: [PATCH 03/13] Use serializer type instead --- pkg/protocol/engine/attestation/slotattestation/snapshot.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/protocol/engine/attestation/slotattestation/snapshot.go b/pkg/protocol/engine/attestation/slotattestation/snapshot.go index 5823ead14..af034322c 100644 --- a/pkg/protocol/engine/attestation/slotattestation/snapshot.go +++ b/pkg/protocol/engine/attestation/slotattestation/snapshot.go @@ -15,7 +15,6 @@ func (m *Manager) Import(reader io.ReadSeeker) error { var attestations []*iotago.Attestation if err := stream.ReadCollection(reader, serializer.SeriLengthPrefixTypeAsUint32, func(i int) error { - attestation, err := stream.ReadObjectWithSize[*iotago.Attestation](reader, serializer.SeriLengthPrefixTypeAsUint16, iotago.AttestationFromBytes(m.apiProvider)) if err != nil { return ierrors.Wrapf(err, "failed to read attestation %d", i) @@ -51,7 +50,9 @@ func (m *Manager) Export(writer io.WriteSeeker, targetSlot iotago.SlotIndex) err } if _, isValid := m.computeAttestationCommitmentOffset(targetSlot); !isValid { - if err := stream.Write(writer, uint32(0)); err != nil { + if err := stream.WriteCollection(writer, serializer.SeriLengthPrefixTypeAsUint32, func() (int, error) { + return 0, nil + }); err != nil { return ierrors.Wrap(err, "failed to write 0 attestation count") } From 1fd43c8f5e6e883dee149f40ebaca658c4b2897a Mon Sep 17 00:00:00 2001 From: Andrea V <1577639+karimodm@users.noreply.github.com> Date: Wed, 15 Nov 2023 13:46:05 +0100 Subject: [PATCH 04/13] WIP: closed prunable storage fix --- .../scheduler/drr/scheduler.go | 5 +++- .../scheduler/passthrough/scheduler.go | 2 ++ pkg/storage/database/db_instance.go | 23 ++++++++++++++----- pkg/storage/database/lockedkvstore.go | 4 ++-- pkg/storage/database/openablekvstore.go | 11 ++++++++- pkg/storage/prunable/bucket_manager.go | 1 + 6 files changed, 36 insertions(+), 10 deletions(-) diff --git a/pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go b/pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go index 6b4a6458b..f4bba78bf 100644 --- a/pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go +++ b/pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go @@ -143,6 +143,8 @@ func (s *Scheduler) Shutdown() { s.bufferMutex.Lock() defer s.bufferMutex.Unlock() + s.TriggerShutdown() + // validator workers need to be shut down first, otherwise they will hang on the shutdown channel. s.validatorBuffer.buffer.ForEach(func(accountID iotago.AccountID, validatorQueue *ValidatorQueue) bool { s.shutdownValidatorQueue(validatorQueue) @@ -152,9 +154,10 @@ func (s *Scheduler) Shutdown() { s.validatorBuffer.Clear() close(s.shutdownSignal) - s.TriggerStopped() s.workersWg.Wait() + + s.TriggerStopped() } // Start starts the scheduler. diff --git a/pkg/protocol/engine/congestioncontrol/scheduler/passthrough/scheduler.go b/pkg/protocol/engine/congestioncontrol/scheduler/passthrough/scheduler.go index f85b9fd21..2b8f9183e 100644 --- a/pkg/protocol/engine/congestioncontrol/scheduler/passthrough/scheduler.go +++ b/pkg/protocol/engine/congestioncontrol/scheduler/passthrough/scheduler.go @@ -36,6 +36,8 @@ func New() *Scheduler { } func (s *Scheduler) Shutdown() { + s.TriggerShutdown() + s.TriggerStopped() } func (s *Scheduler) IsBlockIssuerReady(_ iotago.AccountID, _ ...*blocks.Block) bool { diff --git a/pkg/storage/database/db_instance.go b/pkg/storage/database/db_instance.go index 14cdabeaf..d2597b06a 100644 --- a/pkg/storage/database/db_instance.go +++ b/pkg/storage/database/db_instance.go @@ -1,6 +1,8 @@ package database import ( + "sync/atomic" + "github.com/iotaledger/hive.go/ierrors" "github.com/iotaledger/hive.go/kvstore" "github.com/iotaledger/hive.go/lo" @@ -10,6 +12,7 @@ type DBInstance struct { store *lockedKVStore // KVStore that is used to access the DB instance healthTracker *kvstore.StoreHealthTracker dbConfig Config + isClosed atomic.Bool } func NewDBInstance(dbConfig Config) *DBInstance { @@ -18,7 +21,13 @@ func NewDBInstance(dbConfig Config) *DBInstance { panic(err) } - lockableKVStore := newLockedKVStore(db) + dbInstance := &DBInstance{ + dbConfig: dbConfig, + } + + lockableKVStore := newLockedKVStore(db, dbInstance) + + dbInstance.store = lockableKVStore // HealthTracker state is only modified while holding the lock on the lockableKVStore; // that's why it needs to use openableKVStore (which does not lock) instead of lockableKVStore to avoid a deadlock. @@ -30,11 +39,9 @@ func NewDBInstance(dbConfig Config) *DBInstance { panic(err) } - return &DBInstance{ - store: lockableKVStore, - healthTracker: storeHealthTracker, - dbConfig: dbConfig, - } + dbInstance.healthTracker = storeHealthTracker + + return dbInstance } func (d *DBInstance) Close() { @@ -42,6 +49,8 @@ func (d *DBInstance) Close() { defer d.store.Unlock() d.CloseWithoutLocking() + + d.isClosed.Store(true) } func (d *DBInstance) CloseWithoutLocking() { @@ -52,6 +61,8 @@ func (d *DBInstance) CloseWithoutLocking() { if err := FlushAndClose(d.store); err != nil { panic(err) } + + d.isClosed.Store(true) } // Open re-opens a closed DBInstance. It must only be called while holding a lock on DBInstance, diff --git a/pkg/storage/database/lockedkvstore.go b/pkg/storage/database/lockedkvstore.go index cb365f46e..c76601853 100644 --- a/pkg/storage/database/lockedkvstore.go +++ b/pkg/storage/database/lockedkvstore.go @@ -14,9 +14,9 @@ type lockedKVStore struct { instanceMutex *syncutils.RWMutex } -func newLockedKVStore(storeInstance kvstore.KVStore) *lockedKVStore { +func newLockedKVStore(storeInstance kvstore.KVStore, dbInstance *DBInstance) *lockedKVStore { return &lockedKVStore{ - openableKVStore: newOpenableKVStore(storeInstance), + openableKVStore: newOpenableKVStore(storeInstance, dbInstance), instanceMutex: new(syncutils.RWMutex), } } diff --git a/pkg/storage/database/openablekvstore.go b/pkg/storage/database/openablekvstore.go index 9ff04df3a..b936c0631 100644 --- a/pkg/storage/database/openablekvstore.go +++ b/pkg/storage/database/openablekvstore.go @@ -10,13 +10,15 @@ import ( ) type openableKVStore struct { + dbInstance *DBInstance storeInstance kvstore.KVStore // KVStore that is used to access the DB instance parentStore *openableKVStore dbPrefix kvstore.KeyPrefix } -func newOpenableKVStore(storeInstance kvstore.KVStore) *openableKVStore { +func newOpenableKVStore(storeInstance kvstore.KVStore, dbInstance *DBInstance) *openableKVStore { return &openableKVStore{ + dbInstance: dbInstance, storeInstance: storeInstance, parentStore: nil, dbPrefix: kvstore.EmptyPrefix, @@ -24,6 +26,10 @@ func newOpenableKVStore(storeInstance kvstore.KVStore) *openableKVStore { } func (s *openableKVStore) instance() kvstore.KVStore { + if s.dbInstance.isClosed.Load() { + s.dbInstance.Open() + } + if s.storeInstance != nil { return s.storeInstance } @@ -44,6 +50,7 @@ func (s *openableKVStore) Replace(newKVStore kvstore.KVStore) { func (s *openableKVStore) WithRealm(realm kvstore.Realm) (kvstore.KVStore, error) { return s.withRealm(realm) } + func (s *openableKVStore) withRealm(realm kvstore.Realm) (kvstore.KVStore, error) { return &openableKVStore{ storeInstance: nil, @@ -51,6 +58,7 @@ func (s *openableKVStore) withRealm(realm kvstore.Realm) (kvstore.KVStore, error dbPrefix: realm, }, nil } + func (s *openableKVStore) WithExtendedRealm(realm kvstore.Realm) (kvstore.KVStore, error) { return s.withRealm(s.buildKeyPrefix(realm)) } @@ -98,6 +106,7 @@ func (s *openableKVStore) DeletePrefix(prefix kvstore.KeyPrefix) error { func (s *openableKVStore) Flush() error { return s.instance().Flush() } + func (s *openableKVStore) Close() error { return s.instance().Close() } diff --git a/pkg/storage/prunable/bucket_manager.go b/pkg/storage/prunable/bucket_manager.go index 907fd054a..554e9e115 100644 --- a/pkg/storage/prunable/bucket_manager.go +++ b/pkg/storage/prunable/bucket_manager.go @@ -78,6 +78,7 @@ func (b *BucketManager) Shutdown() { defer b.openDBsMutex.Unlock() b.openDBs.Each(func(epoch iotago.EpochIndex, db *database.DBInstance) { + // TODO: Finally Close db.Close() b.openDBs.Remove(epoch) }) From 2393a6157bd4d41e053561f496ffcd4d921ad819 Mon Sep 17 00:00:00 2001 From: Alexander Sporn Date: Wed, 15 Nov 2023 15:52:47 +0100 Subject: [PATCH 05/13] Changed the BucketManager to not make closed databases reusable until Shutdown is called. --- pkg/storage/database/db_instance.go | 44 ++++++++++--- pkg/storage/database/openablekvstore.go | 22 ++++--- pkg/storage/database/utils.go | 4 +- pkg/storage/prunable/bucket_manager.go | 82 ++++++++++--------------- 4 files changed, 85 insertions(+), 67 deletions(-) diff --git a/pkg/storage/database/db_instance.go b/pkg/storage/database/db_instance.go index d2597b06a..63ef9a41e 100644 --- a/pkg/storage/database/db_instance.go +++ b/pkg/storage/database/db_instance.go @@ -13,6 +13,7 @@ type DBInstance struct { healthTracker *kvstore.StoreHealthTracker dbConfig Config isClosed atomic.Bool + isShutdown atomic.Bool } func NewDBInstance(dbConfig Config) *DBInstance { @@ -44,32 +45,57 @@ func NewDBInstance(dbConfig Config) *DBInstance { return dbInstance } +func (d *DBInstance) Shutdown() { + d.isShutdown.Store(true) + + d.Close() +} + +func (d *DBInstance) Flush() { + d.store.Lock() + defer d.store.Unlock() + + if !d.isClosed.Load() { + _ = d.store.instance().Flush() + } +} + func (d *DBInstance) Close() { d.store.Lock() defer d.store.Unlock() d.CloseWithoutLocking() - - d.isClosed.Store(true) } func (d *DBInstance) CloseWithoutLocking() { - if err := d.healthTracker.MarkHealthy(); err != nil { - panic(err) - } + if !d.isClosed.Load() { + if err := d.healthTracker.MarkHealthy(); err != nil { + panic(err) + } - if err := FlushAndClose(d.store); err != nil { - panic(err) - } + if err := FlushAndClose(d.store); err != nil { + panic(err) + } - d.isClosed.Store(true) + d.isClosed.Store(true) + } } // Open re-opens a closed DBInstance. It must only be called while holding a lock on DBInstance, // otherwise it might cause a race condition and corruption of node's state. func (d *DBInstance) Open() { + if !d.isClosed.Load() { + panic("cannot open DBInstance that is not closed") + } + + if d.isShutdown.Load() { + panic("cannot open DBInstance that is shutdown") + } + d.store.Replace(lo.PanicOnErr(StoreWithDefaultSettings(d.dbConfig.Directory, false, d.dbConfig.Engine))) + d.isClosed.Store(false) + if err := d.healthTracker.MarkCorrupted(); err != nil { panic(err) } diff --git a/pkg/storage/database/openablekvstore.go b/pkg/storage/database/openablekvstore.go index b936c0631..541d620e7 100644 --- a/pkg/storage/database/openablekvstore.go +++ b/pkg/storage/database/openablekvstore.go @@ -25,16 +25,22 @@ func newOpenableKVStore(storeInstance kvstore.KVStore, dbInstance *DBInstance) * } } -func (s *openableKVStore) instance() kvstore.KVStore { - if s.dbInstance.isClosed.Load() { - s.dbInstance.Open() +func (s *openableKVStore) topParent() *openableKVStore { + current := s + for current.parentStore != nil { + current = current.parentStore } + return current +} - if s.storeInstance != nil { - return s.storeInstance +func (s *openableKVStore) instance() kvstore.KVStore { + parent := s.topParent() + + if parent.dbInstance.isClosed.Load() { + parent.dbInstance.Open() } - return s.parentStore.instance() + return parent.storeInstance } func (s *openableKVStore) Replace(newKVStore kvstore.KVStore) { @@ -53,6 +59,7 @@ func (s *openableKVStore) WithRealm(realm kvstore.Realm) (kvstore.KVStore, error func (s *openableKVStore) withRealm(realm kvstore.Realm) (kvstore.KVStore, error) { return &openableKVStore{ + dbInstance: nil, storeInstance: nil, parentStore: s, dbPrefix: realm, @@ -108,7 +115,8 @@ func (s *openableKVStore) Flush() error { } func (s *openableKVStore) Close() error { - return s.instance().Close() + s.topParent().dbInstance.CloseWithoutLocking() + return nil } func (s *openableKVStore) Batched() (kvstore.BatchedMutations, error) { diff --git a/pkg/storage/database/utils.go b/pkg/storage/database/utils.go index 0b47cf41b..eaded98c8 100644 --- a/pkg/storage/database/utils.go +++ b/pkg/storage/database/utils.go @@ -1,9 +1,9 @@ package database func FlushAndClose(store *lockedKVStore) error { - if err := store.FlushWithoutLocking(); err != nil { + if err := store.instance().Flush(); err != nil { return err } - return store.CloseWithoutLocking() + return store.instance().Close() } diff --git a/pkg/storage/prunable/bucket_manager.go b/pkg/storage/prunable/bucket_manager.go index 554e9e115..199249ada 100644 --- a/pkg/storage/prunable/bucket_manager.go +++ b/pkg/storage/prunable/bucket_manager.go @@ -6,6 +6,7 @@ import ( "github.com/zyedidia/generic/cache" "github.com/iotaledger/hive.go/ds/shrinkingmap" + "github.com/iotaledger/hive.go/ds/types" "github.com/iotaledger/hive.go/ierrors" "github.com/iotaledger/hive.go/kvstore" "github.com/iotaledger/hive.go/lo" @@ -17,8 +18,8 @@ import ( ) type BucketManager struct { - openDBs *cache.Cache[iotago.EpochIndex, *database.DBInstance] - openDBsMutex syncutils.RWMutex + openDBsCache *cache.Cache[iotago.EpochIndex, types.Empty] + openDBs *shrinkingmap.ShrinkingMap[iotago.EpochIndex, *database.DBInstance] lastPrunedEpoch *model.EvictionIndex[iotago.EpochIndex] lastPrunedMutex syncutils.RWMutex @@ -38,19 +39,16 @@ func NewBucketManager(dbConfig database.Config, errorHandler func(error), opts . optsMaxOpenDBs: 5, dbConfig: dbConfig, errorHandler: errorHandler, + openDBs: shrinkingmap.New[iotago.EpochIndex, *database.DBInstance](), dbSizes: shrinkingmap.New[iotago.EpochIndex, int64](), lastPrunedEpoch: model.NewEvictionIndex[iotago.EpochIndex](), }, opts, func(m *BucketManager) { - m.openDBs = cache.New[iotago.EpochIndex, *database.DBInstance](m.optsMaxOpenDBs) - m.openDBs.SetEvictCallback(func(baseIndex iotago.EpochIndex, db *database.DBInstance) { - db.Close() - - size, err := dbPrunableDirectorySize(dbConfig.Directory, baseIndex) - if err != nil { - errorHandler(ierrors.Wrapf(err, "failed to get size of prunable directory for base index %d", baseIndex)) + // We use an LRU cache to try closing unnecessary databases. + m.openDBsCache = cache.New[iotago.EpochIndex, types.Empty](m.optsMaxOpenDBs) + m.openDBsCache.SetEvictCallback(func(baseIndex iotago.EpochIndex, _ types.Empty) { + if db, exits := m.openDBs.Get(baseIndex); exits { + db.Close() } - - m.dbSizes.Set(baseIndex, size) }) }) } @@ -74,13 +72,10 @@ func (b *BucketManager) Get(epoch iotago.EpochIndex, realm kvstore.Realm) (kvsto } func (b *BucketManager) Shutdown() { - b.openDBsMutex.Lock() - defer b.openDBsMutex.Unlock() - - b.openDBs.Each(func(epoch iotago.EpochIndex, db *database.DBInstance) { - // TODO: Finally Close - db.Close() - b.openDBs.Remove(epoch) + b.openDBs.ForEach(func(epoch iotago.EpochIndex, db *database.DBInstance) bool { + db.Shutdown() + b.openDBs.Delete(epoch) + return true }) } @@ -93,27 +88,21 @@ func (b *BucketManager) TotalSize() int64 { return true }) - b.openDBsMutex.Lock() - defer b.openDBsMutex.Unlock() - // Add up all the open databases - b.openDBs.Each(func(key iotago.EpochIndex, val *database.DBInstance) { + b.openDBs.ForEach(func(key iotago.EpochIndex, val *database.DBInstance) bool { size, err := dbPrunableDirectorySize(b.dbConfig.Directory, key) if err != nil { b.errorHandler(ierrors.Wrapf(err, "dbPrunableDirectorySize failed for key %s: %s", b.dbConfig.Directory, key)) - - return } sum += size + + return true }) return sum } func (b *BucketManager) BucketSize(epoch iotago.EpochIndex) (int64, error) { - b.openDBsMutex.RLock() - defer b.openDBsMutex.RUnlock() - size, exists := b.dbSizes.Get(epoch) if exists { return size, nil @@ -173,23 +162,23 @@ func (b *BucketManager) RestoreFromDisk() (lastPrunedEpoch iotago.EpochIndex) { // epochIndex 0 -> db 0 // epochIndex 1 -> db 1 // epochIndex 2 -> db 2 -func (b *BucketManager) getDBInstance(epoch iotago.EpochIndex) (db *database.DBInstance) { +func (b *BucketManager) getDBInstance(epoch iotago.EpochIndex) *database.DBInstance { // Lock global mutex to prevent closing and copying storage data on disk during engine switching. b.mutex.RLock() defer b.mutex.RUnlock() - b.openDBsMutex.Lock() - defer b.openDBsMutex.Unlock() - // check if exists again, as other goroutine might have created it in parallel - db, exists := b.openDBs.Get(epoch) - if !exists { - db = database.NewDBInstance(b.dbConfig.WithDirectory(dbPathFromIndex(b.dbConfig.Directory, epoch))) + db := lo.Return1(b.openDBs.GetOrCreate(epoch, func() *database.DBInstance { + db := database.NewDBInstance(b.dbConfig.WithDirectory(dbPathFromIndex(b.dbConfig.Directory, epoch))) // Remove the cached db size since we will open the db b.dbSizes.Delete(epoch) - b.openDBs.Put(epoch, db) - } + + return db + })) + + // Mark the db as used in the cache + b.openDBsCache.Put(epoch, types.Void) return db } @@ -212,9 +201,6 @@ func (b *BucketManager) Prune(epoch iotago.EpochIndex) error { // DeleteBucket deletes directory that stores the data for the given bucket and returns boolean // flag indicating whether a directory for that bucket existed. func (b *BucketManager) DeleteBucket(epoch iotago.EpochIndex) (deleted bool) { - b.openDBsMutex.Lock() - defer b.openDBsMutex.Unlock() - if exists, err := PathExists(dbPathFromIndex(b.dbConfig.Directory, epoch)); err != nil { panic(err) } else if !exists { @@ -223,8 +209,8 @@ func (b *BucketManager) DeleteBucket(epoch iotago.EpochIndex) (deleted bool) { db, exists := b.openDBs.Get(epoch) if exists { - db.Close() - b.openDBs.Remove(epoch) + db.Shutdown() + b.openDBs.Delete(epoch) } if err := os.RemoveAll(dbPathFromIndex(b.dbConfig.Directory, epoch)); err != nil { @@ -254,17 +240,15 @@ func (b *BucketManager) PruneSlots(epoch iotago.EpochIndex, pruningRange [2]iota } func (b *BucketManager) Flush() error { - b.openDBsMutex.RLock() - defer b.openDBsMutex.RUnlock() - - var err error - b.openDBs.Each(func(epoch iotago.EpochIndex, db *database.DBInstance) { - if err = db.KVStore().Flush(); err != nil { - return + var innerErr error + b.openDBs.ForEach(func(epoch iotago.EpochIndex, db *database.DBInstance) bool { + if err := db.KVStore().Flush(); err != nil { + innerErr = err } + return true }) - return err + return innerErr } func PathExists(path string) (bool, error) { From ad8718bf837d86202a1a36b6b75cd2106c190384 Mon Sep 17 00:00:00 2001 From: Alexander Sporn Date: Wed, 15 Nov 2023 16:00:21 +0100 Subject: [PATCH 06/13] Call flush instead of shutdown --- pkg/storage/prunable/bucket_manager.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/storage/prunable/bucket_manager.go b/pkg/storage/prunable/bucket_manager.go index 199249ada..cf4f9902b 100644 --- a/pkg/storage/prunable/bucket_manager.go +++ b/pkg/storage/prunable/bucket_manager.go @@ -233,8 +233,7 @@ func (b *BucketManager) PruneSlots(epoch iotago.EpochIndex, pruningRange [2]iota } } - // shutting down the storage does not prevent this storage from being used again and only forces a flush. - b.Shutdown() + _ = b.Flush() return nil } From 60c331991212f96e843756676163f746cff0f21e Mon Sep 17 00:00:00 2001 From: Alexander Sporn Date: Wed, 15 Nov 2023 16:02:32 +0100 Subject: [PATCH 07/13] calm the doggo --- pkg/storage/database/openablekvstore.go | 1 + pkg/storage/prunable/bucket_manager.go | 2 ++ 2 files changed, 3 insertions(+) diff --git a/pkg/storage/database/openablekvstore.go b/pkg/storage/database/openablekvstore.go index 541d620e7..35e4cd818 100644 --- a/pkg/storage/database/openablekvstore.go +++ b/pkg/storage/database/openablekvstore.go @@ -30,6 +30,7 @@ func (s *openableKVStore) topParent() *openableKVStore { for current.parentStore != nil { current = current.parentStore } + return current } diff --git a/pkg/storage/prunable/bucket_manager.go b/pkg/storage/prunable/bucket_manager.go index cf4f9902b..2863e0c8f 100644 --- a/pkg/storage/prunable/bucket_manager.go +++ b/pkg/storage/prunable/bucket_manager.go @@ -75,6 +75,7 @@ func (b *BucketManager) Shutdown() { b.openDBs.ForEach(func(epoch iotago.EpochIndex, db *database.DBInstance) bool { db.Shutdown() b.openDBs.Delete(epoch) + return true }) } @@ -244,6 +245,7 @@ func (b *BucketManager) Flush() error { if err := db.KVStore().Flush(); err != nil { innerErr = err } + return true }) From 17799cf73bff99a322e14a665b5869eda2a50542 Mon Sep 17 00:00:00 2001 From: Alexander Sporn Date: Wed, 15 Nov 2023 16:21:32 +0100 Subject: [PATCH 08/13] Put the database in the cache directly so we can use it during eviction to close it --- pkg/storage/prunable/bucket_manager.go | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/pkg/storage/prunable/bucket_manager.go b/pkg/storage/prunable/bucket_manager.go index 2863e0c8f..f0da4181a 100644 --- a/pkg/storage/prunable/bucket_manager.go +++ b/pkg/storage/prunable/bucket_manager.go @@ -6,7 +6,6 @@ import ( "github.com/zyedidia/generic/cache" "github.com/iotaledger/hive.go/ds/shrinkingmap" - "github.com/iotaledger/hive.go/ds/types" "github.com/iotaledger/hive.go/ierrors" "github.com/iotaledger/hive.go/kvstore" "github.com/iotaledger/hive.go/lo" @@ -18,7 +17,7 @@ import ( ) type BucketManager struct { - openDBsCache *cache.Cache[iotago.EpochIndex, types.Empty] + openDBsCache *cache.Cache[iotago.EpochIndex, *database.DBInstance] openDBs *shrinkingmap.ShrinkingMap[iotago.EpochIndex, *database.DBInstance] lastPrunedEpoch *model.EvictionIndex[iotago.EpochIndex] @@ -44,11 +43,9 @@ func NewBucketManager(dbConfig database.Config, errorHandler func(error), opts . lastPrunedEpoch: model.NewEvictionIndex[iotago.EpochIndex](), }, opts, func(m *BucketManager) { // We use an LRU cache to try closing unnecessary databases. - m.openDBsCache = cache.New[iotago.EpochIndex, types.Empty](m.optsMaxOpenDBs) - m.openDBsCache.SetEvictCallback(func(baseIndex iotago.EpochIndex, _ types.Empty) { - if db, exits := m.openDBs.Get(baseIndex); exits { - db.Close() - } + m.openDBsCache = cache.New[iotago.EpochIndex, *database.DBInstance](m.optsMaxOpenDBs) + m.openDBsCache.SetEvictCallback(func(baseIndex iotago.EpochIndex, db *database.DBInstance) { + db.Close() }) }) } @@ -179,7 +176,7 @@ func (b *BucketManager) getDBInstance(epoch iotago.EpochIndex) *database.DBInsta })) // Mark the db as used in the cache - b.openDBsCache.Put(epoch, types.Void) + b.openDBsCache.Put(epoch, db) return db } From 367c1518a2b689df78256ea16967813022676664 Mon Sep 17 00:00:00 2001 From: Alexander Sporn Date: Thu, 16 Nov 2023 09:15:47 +0100 Subject: [PATCH 09/13] Fixed data races --- pkg/storage/prunable/bucket_manager.go | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/pkg/storage/prunable/bucket_manager.go b/pkg/storage/prunable/bucket_manager.go index f0da4181a..a76a96259 100644 --- a/pkg/storage/prunable/bucket_manager.go +++ b/pkg/storage/prunable/bucket_manager.go @@ -17,8 +17,10 @@ import ( ) type BucketManager struct { - openDBsCache *cache.Cache[iotago.EpochIndex, *database.DBInstance] - openDBs *shrinkingmap.ShrinkingMap[iotago.EpochIndex, *database.DBInstance] + openDBsCache *cache.Cache[iotago.EpochIndex, *database.DBInstance] + openDBsCacheMutex syncutils.RWMutex + + openDBs *shrinkingmap.ShrinkingMap[iotago.EpochIndex, *database.DBInstance] lastPrunedEpoch *model.EvictionIndex[iotago.EpochIndex] lastPrunedMutex syncutils.RWMutex @@ -69,8 +71,12 @@ func (b *BucketManager) Get(epoch iotago.EpochIndex, realm kvstore.Realm) (kvsto } func (b *BucketManager) Shutdown() { + b.openDBsCacheMutex.Lock() + defer b.openDBsCacheMutex.Unlock() + b.openDBs.ForEach(func(epoch iotago.EpochIndex, db *database.DBInstance) bool { db.Shutdown() + b.openDBsCache.Remove(epoch) b.openDBs.Delete(epoch) return true @@ -165,6 +171,9 @@ func (b *BucketManager) getDBInstance(epoch iotago.EpochIndex) *database.DBInsta b.mutex.RLock() defer b.mutex.RUnlock() + b.openDBsCacheMutex.Lock() + defer b.openDBsCacheMutex.Unlock() + // check if exists again, as other goroutine might have created it in parallel db := lo.Return1(b.openDBs.GetOrCreate(epoch, func() *database.DBInstance { db := database.NewDBInstance(b.dbConfig.WithDirectory(dbPathFromIndex(b.dbConfig.Directory, epoch))) @@ -199,6 +208,9 @@ func (b *BucketManager) Prune(epoch iotago.EpochIndex) error { // DeleteBucket deletes directory that stores the data for the given bucket and returns boolean // flag indicating whether a directory for that bucket existed. func (b *BucketManager) DeleteBucket(epoch iotago.EpochIndex) (deleted bool) { + b.openDBsCacheMutex.Lock() + defer b.openDBsCacheMutex.Unlock() + if exists, err := PathExists(dbPathFromIndex(b.dbConfig.Directory, epoch)); err != nil { panic(err) } else if !exists { @@ -208,6 +220,7 @@ func (b *BucketManager) DeleteBucket(epoch iotago.EpochIndex) (deleted bool) { db, exists := b.openDBs.Get(epoch) if exists { db.Shutdown() + b.openDBsCache.Remove(epoch) b.openDBs.Delete(epoch) } From f515ac479f55abf2bd907f4fdfdbfc00f5b0363c Mon Sep 17 00:00:00 2001 From: Alexander Sporn Date: Thu, 16 Nov 2023 10:04:51 +0100 Subject: [PATCH 10/13] Generate and upload a feature network snapshot --- .github/workflows/feature-network-deploy.yml | 27 +++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/.github/workflows/feature-network-deploy.yml b/.github/workflows/feature-network-deploy.yml index 812411683..d0385c98f 100644 --- a/.github/workflows/feature-network-deploy.yml +++ b/.github/workflows/feature-network-deploy.yml @@ -46,6 +46,31 @@ jobs: cache-from: type=local,src=/tmp/.buildx-cache cache-to: type=local,mode=max,dest=/tmp/.buildx-cache-new + - uses: actions/setup-go@v4 + with: + go-version-file: 'tools/genesis-snapshot/go.mod' + cache: false + + - name: Print Go version + run: go version + + - name: Generate genesis snapshot + working-directory: tools/genesis-snapshot + run: go run -tags=rocksdb . --config feature --seed 7R1itJx5hVuo9w9hjg5cwKFmek4HMSoBDgJZN8hKGxih --filename genesis-snapshot.bin + + - name: Upload genesis snapshot + uses: actions/upload-artifact@v3 + with: + name: snapshot.bin + path: tools/genesis-snapshot/genesis-snapshot.bin + + - name: Get artifact URL + id: get-snapshot-url + run: | + ARTIFACT_URL=$(curl -s -H "Authorization: token ${{ secrets.GITHUB_TOKEN }}" \ + "https://api.github.com/repos/${{ github.repository }}/actions/runs/${{ github.run_id }}/artifacts" | jq -r '.artifacts[] | select(.name=="snapshot.bin") | .archive_download_url') + echo "::set-output name=artifact_url::$ARTIFACT_URL" + - # Temp fix # https://github.com/docker/build-push-action/issues/252 # https://github.com/moby/buildkit/issues/1896 @@ -70,7 +95,7 @@ jobs: - name: Ansible deploy env: CUSTOM_SNAPSHOT_URL: '${{ github.event.inputs.snapshotUrl }}' - DEFAULT_SNAPSHOT_URL: 'https://0x0.st/HywH.bin' + DEFAULT_SNAPSHOT_URL: '${{ steps.get-snapshot-url.outputs.artifact_url }}' NETWORK_ENVIRONMENT: '${{ secrets.NETWORK_ENVIRONMENT }}' IOTA_CORE_DOCKER_IMAGE_REPO: 'iotaledger/iota-core' IOTA_CORE_DOCKER_IMAGE_TAG: 'feature' From 441589e80113999f409007dcec87b3ae1d1da529 Mon Sep 17 00:00:00 2001 From: Andrea V <1577639+karimodm@users.noreply.github.com> Date: Thu, 16 Nov 2023 10:09:45 +0100 Subject: [PATCH 11/13] Feature snapshot genesis time at 10_000 slots in the past --- tools/genesis-snapshot/presets/presets.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/genesis-snapshot/presets/presets.go b/tools/genesis-snapshot/presets/presets.go index f44f7a0a9..d12b9c289 100644 --- a/tools/genesis-snapshot/presets/presets.go +++ b/tools/genesis-snapshot/presets/presets.go @@ -242,7 +242,7 @@ var Feature = []options.Option[snapshotcreator.Options]{ iotago.NewV3ProtocolParameters( iotago.WithNetworkOptions("feature", "rms"), iotago.WithSupplyOptions(4_600_000_000_000_000, 100, 1, 10, 100, 100, 100), - iotago.WithTimeProviderOptions(666666, time.Now().Unix(), 10, 13), + iotago.WithTimeProviderOptions(666666, time.Now().Unix()-100_000, 10, 13), // Let's fix genesis at 10_000 slots back. iotago.WithLivenessOptions(30, 30, 10, 20, 30), // increase/decrease threshold = fraction * slotDurationInSeconds * schedulerRate iotago.WithCongestionControlOptions(500, 500, 500, 800000, 500000, 100000, 1000, 100), From 65168dc08b58bcfd4311596bba3f31a5ce6118e5 Mon Sep 17 00:00:00 2001 From: Alexander Sporn Date: Thu, 16 Nov 2023 10:23:19 +0100 Subject: [PATCH 12/13] Echo the artifact url --- .github/workflows/feature-network-deploy.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/feature-network-deploy.yml b/.github/workflows/feature-network-deploy.yml index d0385c98f..07cc0c1d6 100644 --- a/.github/workflows/feature-network-deploy.yml +++ b/.github/workflows/feature-network-deploy.yml @@ -67,9 +67,9 @@ jobs: - name: Get artifact URL id: get-snapshot-url run: | - ARTIFACT_URL=$(curl -s -H "Authorization: token ${{ secrets.GITHUB_TOKEN }}" \ - "https://api.github.com/repos/${{ github.repository }}/actions/runs/${{ github.run_id }}/artifacts" | jq -r '.artifacts[] | select(.name=="snapshot.bin") | .archive_download_url') - echo "::set-output name=artifact_url::$ARTIFACT_URL" + ARTIFACT_URL=$(curl -s -H "Authorization: token ${{ secrets.GITHUB_TOKEN }}" "https://api.github.com/repos/${{ github.repository }}/actions/runs/${{ github.run_id }}/artifacts" | jq -r '.artifacts[] | select(.name=="snapshot.bin") | .archive_download_url') + echo "Artifact URL: $ARTIFACT_URL" + echo "artifact_url=$ARTIFACT_URL" >> $GITHUB_OUTPUT - # Temp fix # https://github.com/docker/build-push-action/issues/252 From afc619a6abcd8008be81f18e058659f9fe9b1b25 Mon Sep 17 00:00:00 2001 From: Alexander Sporn Date: Thu, 16 Nov 2023 10:53:15 +0100 Subject: [PATCH 13/13] Upload differently --- .github/workflows/feature-network-deploy.yml | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/.github/workflows/feature-network-deploy.yml b/.github/workflows/feature-network-deploy.yml index 07cc0c1d6..3a09e46d1 100644 --- a/.github/workflows/feature-network-deploy.yml +++ b/.github/workflows/feature-network-deploy.yml @@ -58,18 +58,12 @@ jobs: working-directory: tools/genesis-snapshot run: go run -tags=rocksdb . --config feature --seed 7R1itJx5hVuo9w9hjg5cwKFmek4HMSoBDgJZN8hKGxih --filename genesis-snapshot.bin - - name: Upload genesis snapshot - uses: actions/upload-artifact@v3 - with: - name: snapshot.bin - path: tools/genesis-snapshot/genesis-snapshot.bin - - - name: Get artifact URL - id: get-snapshot-url + - name: Upload snapshot + id: upload-snapshot run: | - ARTIFACT_URL=$(curl -s -H "Authorization: token ${{ secrets.GITHUB_TOKEN }}" "https://api.github.com/repos/${{ github.repository }}/actions/runs/${{ github.run_id }}/artifacts" | jq -r '.artifacts[] | select(.name=="snapshot.bin") | .archive_download_url') - echo "Artifact URL: $ARTIFACT_URL" - echo "artifact_url=$ARTIFACT_URL" >> $GITHUB_OUTPUT + SNAPSHOT_URL=$(curl -T ./tools/genesis-snapshot/genesis-snapshot.bin https://transfer.sh) + echo "Snapshot URL: $SNAPSHOT_URL" + echo "snapshot_url=$SNAPSHOT_URL" >> $GITHUB_OUTPUT - # Temp fix # https://github.com/docker/build-push-action/issues/252 @@ -95,7 +89,7 @@ jobs: - name: Ansible deploy env: CUSTOM_SNAPSHOT_URL: '${{ github.event.inputs.snapshotUrl }}' - DEFAULT_SNAPSHOT_URL: '${{ steps.get-snapshot-url.outputs.artifact_url }}' + DEFAULT_SNAPSHOT_URL: '${{ steps.upload-snapshot.outputs.snapshot_url }}' NETWORK_ENVIRONMENT: '${{ secrets.NETWORK_ENVIRONMENT }}' IOTA_CORE_DOCKER_IMAGE_REPO: 'iotaledger/iota-core' IOTA_CORE_DOCKER_IMAGE_TAG: 'feature'