From 3b55683bced0485f8264528a044c58c083ee44a4 Mon Sep 17 00:00:00 2001 From: Hans Moog <3293976+hmoog@users.noreply.github.com> Date: Mon, 6 Nov 2023 21:15:09 +0100 Subject: [PATCH 01/14] Refactor: Reset logic (#496) --- pkg/protocol/block_dispatcher.go | 5 +- .../blockgadget/testframework_test.go | 2 +- pkg/protocol/engine/engine.go | 3 +- pkg/protocol/enginemanager/enginemanager.go | 2 +- .../seatmanager/topstakers/topstakers_test.go | 4 +- .../performance/testsuite_test.go | 8 +- pkg/storage/permanent/settings.go | 68 +++++++++----- pkg/storage/prunable/bucket_manager.go | 18 ++-- pkg/storage/prunable/epochstore/constants.go | 7 ++ pkg/storage/prunable/epochstore/epoch_kv.go | 49 ++++++++-- pkg/storage/prunable/epochstore/store.go | 61 ++++++++++--- pkg/storage/prunable/prunable.go | 90 ++++++++++--------- pkg/storage/prunable/prunable_epoch.go | 1 - pkg/storage/storage_prunable.go | 68 ++++++++++---- 14 files changed, 270 insertions(+), 116 deletions(-) create mode 100644 pkg/storage/prunable/epochstore/constants.go diff --git a/pkg/protocol/block_dispatcher.go b/pkg/protocol/block_dispatcher.go index d718ee350..376b6e3a7 100644 --- a/pkg/protocol/block_dispatcher.go +++ b/pkg/protocol/block_dispatcher.go @@ -242,7 +242,10 @@ func (b *BlockDispatcher) processWarpSyncResponse(commitmentID iotago.Commitment b.processedWarpSyncRequests.Add(commitmentID) - // make sure the engine is clean before we start processing the blocks + // make sure the engine is clean and requires a warp-sync before we start processing the blocks + if targetEngine.Workers.WaitChildren(); targetEngine.Storage.Settings().LatestCommitment().ID().Slot() > commitmentID.Slot() { + return nil + } targetEngine.Reset() // Once all blocks are booked we diff --git a/pkg/protocol/engine/consensus/blockgadget/testframework_test.go b/pkg/protocol/engine/consensus/blockgadget/testframework_test.go index a2d7371bc..211c540c0 100644 --- a/pkg/protocol/engine/consensus/blockgadget/testframework_test.go +++ b/pkg/protocol/engine/consensus/blockgadget/testframework_test.go @@ -43,7 +43,7 @@ func NewTestFramework(test *testing.T) *TestFramework { T: test, blocks: shrinkingmap.New[string, *blocks.Block](), - SeatManager: mock.NewManualPOA(api.SingleVersionProvider(tpkg.TestAPI), epochstore.NewStore(kvstore.Realm{}, kvstore.Realm{}, mapdb.NewMapDB(), 0, (*account.Accounts).Bytes, account.AccountsFromBytes)), + SeatManager: mock.NewManualPOA(api.SingleVersionProvider(tpkg.TestAPI), epochstore.NewStore(kvstore.Realm{}, mapdb.NewMapDB(), 0, (*account.Accounts).Bytes, account.AccountsFromBytes)), } evictionState := eviction.NewState(mapdb.NewMapDB(), func(slot iotago.SlotIndex) (*slotstore.Store[iotago.BlockID, iotago.CommitmentID], error) { diff --git a/pkg/protocol/engine/engine.go b/pkg/protocol/engine/engine.go index 2db383500..677572c90 100644 --- a/pkg/protocol/engine/engine.go +++ b/pkg/protocol/engine/engine.go @@ -212,6 +212,7 @@ func New( func(e *Engine) { fmt.Println("Engine Settings", e.Storage.Settings().String()) }, + (*Engine).Reset, (*Engine).TriggerInitialized, ) } @@ -223,8 +224,6 @@ func (e *Engine) ProcessBlockFromPeer(block *model.Block, source peer.ID) { // Reset resets the component to a clean state as if it was created at the last commitment. func (e *Engine) Reset() { - e.Workers.WaitChildren() - e.BlockRequester.Clear() e.Storage.Reset() e.EvictionState.Reset() diff --git a/pkg/protocol/enginemanager/enginemanager.go b/pkg/protocol/enginemanager/enginemanager.go index a36b01175..06cf85631 100644 --- a/pkg/protocol/enginemanager/enginemanager.go +++ b/pkg/protocol/enginemanager/enginemanager.go @@ -307,7 +307,7 @@ func (e *EngineManager) rollbackStorage(newStorage *storage.Storage, slot iotago return ierrors.Wrap(err, "failed to rollback settings") } - if err := newStorage.RollbackPrunable(slot); err != nil { + if err := newStorage.Rollback(slot); err != nil { return ierrors.Wrap(err, "failed to rollback prunable data") } diff --git a/pkg/protocol/sybilprotection/seatmanager/topstakers/topstakers_test.go b/pkg/protocol/sybilprotection/seatmanager/topstakers/topstakers_test.go index 54790e263..dded90610 100644 --- a/pkg/protocol/sybilprotection/seatmanager/topstakers/topstakers_test.go +++ b/pkg/protocol/sybilprotection/seatmanager/topstakers/topstakers_test.go @@ -21,7 +21,7 @@ import ( ) func TestTopStakers_InitializeCommittee(t *testing.T) { - committeeStore := epochstore.NewStore(kvstore.Realm{}, kvstore.Realm{}, mapdb.NewMapDB(), 0, (*account.Accounts).Bytes, account.AccountsFromBytes) + committeeStore := epochstore.NewStore(kvstore.Realm{}, mapdb.NewMapDB(), 0, (*account.Accounts).Bytes, account.AccountsFromBytes) topStakersSeatManager := &SeatManager{ apiProvider: api.SingleVersionProvider(tpkg.TestAPI), @@ -58,7 +58,7 @@ func TestTopStakers_InitializeCommittee(t *testing.T) { } func TestTopStakers_RotateCommittee(t *testing.T) { - committeeStore := epochstore.NewStore(kvstore.Realm{}, kvstore.Realm{}, mapdb.NewMapDB(), 0, (*account.Accounts).Bytes, account.AccountsFromBytes) + committeeStore := epochstore.NewStore(kvstore.Realm{}, mapdb.NewMapDB(), 0, (*account.Accounts).Bytes, account.AccountsFromBytes) topStakersSeatManager := &SeatManager{ apiProvider: api.SingleVersionProvider(tpkg.TestAPI), diff --git a/pkg/protocol/sybilprotection/sybilprotectionv1/performance/testsuite_test.go b/pkg/protocol/sybilprotection/sybilprotectionv1/performance/testsuite_test.go index 62c5c1e48..7b4027b2a 100644 --- a/pkg/protocol/sybilprotection/sybilprotectionv1/performance/testsuite_test.go +++ b/pkg/protocol/sybilprotection/sybilprotectionv1/performance/testsuite_test.go @@ -70,10 +70,10 @@ func (t *TestSuite) InitPerformanceTracker() { return p, nil } - rewardsStore := epochstore.NewEpochKVStore(kvstore.Realm{}, kvstore.Realm{}, mapdb.NewMapDB(), 0) - poolStatsStore := epochstore.NewStore(kvstore.Realm{}, kvstore.Realm{}, mapdb.NewMapDB(), 0, (*model.PoolsStats).Bytes, model.PoolsStatsFromBytes) - committeeStore := epochstore.NewStore(kvstore.Realm{}, kvstore.Realm{}, mapdb.NewMapDB(), 0, (*account.Accounts).Bytes, account.AccountsFromBytes) - committeeCandidatesStore := epochstore.NewEpochKVStore(kvstore.Realm{}, kvstore.Realm{}, mapdb.NewMapDB(), 0) + rewardsStore := epochstore.NewEpochKVStore(kvstore.Realm{}, mapdb.NewMapDB(), 0) + poolStatsStore := epochstore.NewStore(kvstore.Realm{}, mapdb.NewMapDB(), 0, (*model.PoolsStats).Bytes, model.PoolsStatsFromBytes) + committeeStore := epochstore.NewStore(kvstore.Realm{}, mapdb.NewMapDB(), 0, (*account.Accounts).Bytes, account.AccountsFromBytes) + committeeCandidatesStore := epochstore.NewEpochKVStore(kvstore.Realm{}, mapdb.NewMapDB(), 0) t.Instance = NewTracker( rewardsStore.GetEpoch, diff --git a/pkg/storage/permanent/settings.go b/pkg/storage/permanent/settings.go index d1ad5fe82..7cfe95ed5 100644 --- a/pkg/storage/permanent/settings.go +++ b/pkg/storage/permanent/settings.go @@ -22,6 +22,7 @@ const ( snapshotImportedKey = iota latestCommitmentKey latestFinalizedSlotKey + latestStoredSlotKey protocolVersionEpochMappingKey futureProtocolParametersKey protocolParametersKey @@ -34,6 +35,7 @@ type Settings struct { storeSnapshotImported *kvstore.TypedValue[bool] storeLatestCommitment *kvstore.TypedValue[*model.Commitment] storeLatestFinalizedSlot *kvstore.TypedValue[iotago.SlotIndex] + storeLatestProcessedSlot *kvstore.TypedValue[iotago.SlotIndex] storeLatestIssuedValidationBlock *kvstore.TypedValue[*model.Block] storeProtocolVersionEpochMapping *kvstore.TypedStore[iotago.Version, iotago.EpochIndex] storeFutureProtocolParameters *kvstore.TypedStore[iotago.Version, *types.Tuple[iotago.EpochIndex, iotago.Identifier]] @@ -83,6 +85,12 @@ func NewSettings(store kvstore.KVStore, opts ...options.Option[api.EpochBasedPro iotago.SlotIndex.Bytes, iotago.SlotIndexFromBytes, ), + storeLatestProcessedSlot: kvstore.NewTypedValue( + store, + []byte{latestStoredSlotKey}, + iotago.SlotIndex.Bytes, + iotago.SlotIndexFromBytes, + ), storeLatestIssuedValidationBlock: kvstore.NewTypedValue( store, []byte{latestIssuedValidationBlock}, @@ -285,7 +293,7 @@ func (s *Settings) LatestFinalizedSlot() iotago.SlotIndex { s.mutex.RLock() defer s.mutex.RUnlock() - return s.latestFinalizedSlot() + return read(s.storeLatestFinalizedSlot) } func (s *Settings) SetLatestFinalizedSlot(slot iotago.SlotIndex) (err error) { @@ -295,23 +303,42 @@ func (s *Settings) SetLatestFinalizedSlot(slot iotago.SlotIndex) (err error) { return s.storeLatestFinalizedSlot.Set(slot) } -func (s *Settings) latestFinalizedSlot() iotago.SlotIndex { - latestFinalizedSlot, err := s.storeLatestFinalizedSlot.Get() - if err != nil { - if ierrors.Is(err, kvstore.ErrKeyNotFound) { - return 0 +func (s *Settings) LatestStoredSlot() iotago.SlotIndex { + s.mutex.RLock() + defer s.mutex.RUnlock() + + return read(s.storeLatestProcessedSlot) +} + +func (s *Settings) SetLatestStoredSlot(slot iotago.SlotIndex) (err error) { + s.mutex.Lock() + defer s.mutex.Unlock() + + return s.storeLatestProcessedSlot.Set(slot) +} + +func (s *Settings) AdvanceLatestStoredSlot(slot iotago.SlotIndex) (err error) { + s.mutex.Lock() + defer s.mutex.Unlock() + + if _, err = s.storeLatestProcessedSlot.Compute(func(latestStoredSlot iotago.SlotIndex, _ bool) (newValue iotago.SlotIndex, err error) { + if latestStoredSlot >= slot { + return latestStoredSlot, kvstore.ErrTypedValueNotChanged } - panic(err) + + return slot, nil + }); err != nil { + return ierrors.Wrap(err, "failed to advance latest stored slot") } - return latestFinalizedSlot + return nil } func (s *Settings) LatestIssuedValidationBlock() *model.Block { s.mutex.RLock() defer s.mutex.RUnlock() - return s.latestIssuedValidationBlock() + return read(s.storeLatestIssuedValidationBlock) } func (s *Settings) SetLatestIssuedValidationBlock(block *model.Block) (err error) { @@ -321,18 +348,6 @@ func (s *Settings) SetLatestIssuedValidationBlock(block *model.Block) (err error return s.storeLatestIssuedValidationBlock.Set(block) } -func (s *Settings) latestIssuedValidationBlock() *model.Block { - block, err := s.storeLatestIssuedValidationBlock.Get() - if err != nil { - if ierrors.Is(err, kvstore.ErrKeyNotFound) { - return nil - } - panic(err) - } - - return block -} - func (s *Settings) Export(writer io.WriteSeeker, targetCommitment *iotago.Commitment) error { var commitmentBytes []byte var err error @@ -579,7 +594,7 @@ func (s *Settings) String() string { builder := stringify.NewStructBuilder("Settings") builder.AddField(stringify.NewStructField("IsSnapshotImported", lo.PanicOnErr(s.storeSnapshotImported.Has()))) builder.AddField(stringify.NewStructField("LatestCommitment", s.latestCommitment())) - builder.AddField(stringify.NewStructField("LatestFinalizedSlot", s.latestFinalizedSlot())) + builder.AddField(stringify.NewStructField("LatestFinalizedSlot", read(s.storeLatestFinalizedSlot))) if err := s.storeProtocolParameters.Iterate(kvstore.EmptyPrefix, func(version iotago.Version, protocolParams iotago.ProtocolParameters) bool { builder.AddField(stringify.NewStructField(fmt.Sprintf("ProtocolParameters(%d)", version), protocolParams)) @@ -590,3 +605,12 @@ func (s *Settings) String() string { return builder.String() } + +func read[T any](typedValue *kvstore.TypedValue[T]) (value T) { + value, err := typedValue.Get() + if err != nil && !ierrors.Is(err, kvstore.ErrKeyNotFound) { + panic(err) + } + + return value +} diff --git a/pkg/storage/prunable/bucket_manager.go b/pkg/storage/prunable/bucket_manager.go index 980d9d6b6..907fd054a 100644 --- a/pkg/storage/prunable/bucket_manager.go +++ b/pkg/storage/prunable/bucket_manager.go @@ -236,18 +236,22 @@ func (b *BucketManager) DeleteBucket(epoch iotago.EpochIndex) (deleted bool) { return true } -// RollbackBucket removes data in the bucket in slots [targetSlotIndex+1; epochEndSlot]. -func (b *BucketManager) RollbackBucket(epoch iotago.EpochIndex, targetSlot, epochEndSlot iotago.SlotIndex) error { - oldBucketKvStore := b.getDBInstance(epoch).KVStore() - for clearSlot := targetSlot + 1; clearSlot <= epochEndSlot; clearSlot++ { - // delete slot prefix from forkedPrunable storage that will be eventually copied into the new engine - if err := oldBucketKvStore.DeletePrefix(clearSlot.MustBytes()); err != nil { - return ierrors.Wrapf(err, "error while clearing slot %d in bucket for epoch %d", clearSlot, epoch) +// PruneSlots prunes the data of all slots in the range [from, to] in the given epoch. +func (b *BucketManager) PruneSlots(epoch iotago.EpochIndex, pruningRange [2]iotago.SlotIndex) error { + epochStore := b.getDBInstance(epoch).KVStore() + + for slot := pruningRange[0]; slot <= pruningRange[1]; slot++ { + if err := epochStore.DeletePrefix(slot.MustBytes()); err != nil { + return ierrors.Wrapf(err, "error while clearing slot %d in bucket for epoch %d", slot, epoch) } } + // shutting down the storage does not prevent this storage from being used again and only forces a flush. + b.Shutdown() + return nil } + func (b *BucketManager) Flush() error { b.openDBsMutex.RLock() defer b.openDBsMutex.RUnlock() diff --git a/pkg/storage/prunable/epochstore/constants.go b/pkg/storage/prunable/epochstore/constants.go new file mode 100644 index 000000000..f27e0f4ac --- /dev/null +++ b/pkg/storage/prunable/epochstore/constants.go @@ -0,0 +1,7 @@ +package epochstore + +const ( + entriesKey byte = iota + lastAccessedEpochKey + lastPrunedEpochKey +) diff --git a/pkg/storage/prunable/epochstore/epoch_kv.go b/pkg/storage/prunable/epochstore/epoch_kv.go index 3fd580eb7..aa8aa361d 100644 --- a/pkg/storage/prunable/epochstore/epoch_kv.go +++ b/pkg/storage/prunable/epochstore/epoch_kv.go @@ -14,16 +14,18 @@ type EpochKVStore struct { kv kvstore.KVStore pruningDelay iotago.EpochIndex - lastPrunedEpoch *model.PruningIndex + lastAccessedEpoch *kvstore.TypedValue[iotago.EpochIndex] + lastPrunedEpoch *model.PruningIndex } -func NewEpochKVStore(storeRealm, pruningRealm kvstore.Realm, kv kvstore.KVStore, pruningDelay iotago.EpochIndex) *EpochKVStore { +func NewEpochKVStore(storeRealm kvstore.Realm, kv kvstore.KVStore, pruningDelay iotago.EpochIndex) *EpochKVStore { return &EpochKVStore{ - realm: storeRealm, - kv: lo.PanicOnErr(kv.WithExtendedRealm(storeRealm)), - pruningDelay: pruningDelay, - lastPrunedEpoch: model.NewPruningIndex(lo.PanicOnErr(kv.WithExtendedRealm(pruningRealm)), storeRealm), + realm: storeRealm, + kv: lo.PanicOnErr(kv.WithExtendedRealm(append(storeRealm, entriesKey))), + pruningDelay: pruningDelay, + lastAccessedEpoch: kvstore.NewTypedValue(kv, append(storeRealm, lastAccessedEpochKey), iotago.EpochIndex.Bytes, iotago.EpochIndexFromBytes), + lastPrunedEpoch: model.NewPruningIndex(lo.PanicOnErr(kv.WithExtendedRealm(storeRealm)), kvstore.Realm{lastPrunedEpochKey}), } } @@ -37,11 +39,31 @@ func (e *EpochKVStore) RestoreLastPrunedEpoch() error { return e.lastPrunedEpoch.RestoreFromDisk() } +func (e *EpochKVStore) LastAccessedEpoch() (lastAccessedEpoch iotago.EpochIndex, err error) { + if lastAccessedEpoch, err = e.lastAccessedEpoch.Get(); err != nil { + if !ierrors.Is(err, kvstore.ErrKeyNotFound) { + err = ierrors.Wrap(err, "failed to get last accessed epoch") + } else { + err = nil + } + } + + return lastAccessedEpoch, err +} + func (e *EpochKVStore) LastPrunedEpoch() (iotago.EpochIndex, bool) { return e.lastPrunedEpoch.Index() } func (e *EpochKVStore) GetEpoch(epoch iotago.EpochIndex) (kvstore.KVStore, error) { + _, _ = e.lastAccessedEpoch.Compute(func(lastAccessedEpoch iotago.EpochIndex, exists bool) (newValue iotago.EpochIndex, err error) { + if lastAccessedEpoch >= epoch { + return lastAccessedEpoch, kvstore.ErrTypedValueNotChanged + } + + return epoch, nil + }) + if e.isTooOld(epoch) { return nil, ierrors.Wrapf(database.ErrEpochPruned, "epoch %d is too old", epoch) } @@ -81,3 +103,18 @@ func (e *EpochKVStore) Prune(epoch iotago.EpochIndex, defaultPruningDelay iotago return nil } + +func (e *EpochKVStore) RollbackEpochs(epoch iotago.EpochIndex) (lastPrunedEpoch iotago.EpochIndex, err error) { + lastAccessedEpoch, err := e.LastAccessedEpoch() + if err != nil { + return lastAccessedEpoch, ierrors.Wrap(err, "failed to get last accessed epoch") + } + + for epochToPrune := epoch; epochToPrune <= lastAccessedEpoch; epochToPrune++ { + if err = e.DeleteEpoch(epochToPrune); err != nil { + return epochToPrune, ierrors.Wrapf(err, "error while deleting epoch %d", epochToPrune) + } + } + + return lastAccessedEpoch, nil +} diff --git a/pkg/storage/prunable/epochstore/store.go b/pkg/storage/prunable/epochstore/store.go index f93d71832..e2fbc32c7 100644 --- a/pkg/storage/prunable/epochstore/store.go +++ b/pkg/storage/prunable/epochstore/store.go @@ -14,28 +14,36 @@ type Store[V any] struct { kv *kvstore.TypedStore[iotago.EpochIndex, V] pruningDelay iotago.EpochIndex - lastPrunedEpoch *model.PruningIndex + lastAccessedEpoch *kvstore.TypedValue[iotago.EpochIndex] + lastPrunedEpoch *model.PruningIndex } -func NewStore[V any](storeRealm, pruningRealm kvstore.Realm, kv kvstore.KVStore, pruningDelay iotago.EpochIndex, vToBytes kvstore.ObjectToBytes[V], bytesToV kvstore.BytesToObject[V]) *Store[V] { +func NewStore[V any](storeRealm kvstore.Realm, kv kvstore.KVStore, pruningDelay iotago.EpochIndex, vToBytes kvstore.ObjectToBytes[V], bytesToV kvstore.BytesToObject[V]) *Store[V] { return &Store[V]{ - realm: storeRealm, - kv: kvstore.NewTypedStore(lo.PanicOnErr(kv.WithExtendedRealm(storeRealm)), iotago.EpochIndex.Bytes, iotago.EpochIndexFromBytes, vToBytes, bytesToV), - pruningDelay: pruningDelay, - lastPrunedEpoch: model.NewPruningIndex(lo.PanicOnErr(kv.WithExtendedRealm(pruningRealm)), storeRealm), + realm: storeRealm, + kv: kvstore.NewTypedStore(lo.PanicOnErr(kv.WithExtendedRealm(append(storeRealm, entriesKey))), iotago.EpochIndex.Bytes, iotago.EpochIndexFromBytes, vToBytes, bytesToV), + pruningDelay: pruningDelay, + lastAccessedEpoch: kvstore.NewTypedValue(kv, append(storeRealm, lastAccessedEpochKey), iotago.EpochIndex.Bytes, iotago.EpochIndexFromBytes), + lastPrunedEpoch: model.NewPruningIndex(lo.PanicOnErr(kv.WithExtendedRealm(storeRealm)), kvstore.Realm{lastPrunedEpochKey}), } } -func (s *Store[V]) isTooOld(epoch iotago.EpochIndex) bool { - prunedEpoch, hasPruned := s.lastPrunedEpoch.Index() - - return hasPruned && epoch <= prunedEpoch -} - func (s *Store[V]) RestoreLastPrunedEpoch() error { return s.lastPrunedEpoch.RestoreFromDisk() } +func (s *Store[V]) LastAccessedEpoch() (lastAccessedEpoch iotago.EpochIndex, err error) { + if lastAccessedEpoch, err = s.lastAccessedEpoch.Get(); err != nil { + if !ierrors.Is(err, kvstore.ErrKeyNotFound) { + err = ierrors.Wrap(err, "failed to get last accessed epoch") + } else { + err = nil + } + } + + return lastAccessedEpoch, err +} + func (s *Store[V]) LastPrunedEpoch() (iotago.EpochIndex, bool) { return s.lastPrunedEpoch.Index() } @@ -61,6 +69,14 @@ func (s *Store[V]) Load(epoch iotago.EpochIndex) (V, error) { } func (s *Store[V]) Store(epoch iotago.EpochIndex, value V) error { + _, _ = s.lastAccessedEpoch.Compute(func(lastAccessedEpoch iotago.EpochIndex, exists bool) (newValue iotago.EpochIndex, err error) { + if lastAccessedEpoch >= epoch { + return lastAccessedEpoch, kvstore.ErrTypedValueNotChanged + } + + return epoch, nil + }) + if s.isTooOld(epoch) { return ierrors.Wrapf(database.ErrEpochPruned, "epoch %d is too old", epoch) } @@ -133,3 +149,24 @@ func (s *Store[V]) Prune(epoch iotago.EpochIndex, defaultPruningDelay iotago.Epo return nil } + +func (s *Store[V]) RollbackEpochs(epoch iotago.EpochIndex) (lastPrunedEpoch iotago.EpochIndex, err error) { + lastAccessedEpoch, err := s.LastAccessedEpoch() + if err != nil { + return lastAccessedEpoch, ierrors.Wrap(err, "failed to get last accessed epoch") + } + + for epochToPrune := epoch; epochToPrune <= lastAccessedEpoch; epochToPrune++ { + if err = s.DeleteEpoch(epochToPrune); err != nil { + return epochToPrune, ierrors.Wrapf(err, "error while deleting epoch %d", epochToPrune) + } + } + + return lastAccessedEpoch, nil +} + +func (s *Store[V]) isTooOld(epoch iotago.EpochIndex) bool { + prunedEpoch, hasPruned := s.lastPrunedEpoch.Index() + + return hasPruned && epoch <= prunedEpoch +} diff --git a/pkg/storage/prunable/prunable.go b/pkg/storage/prunable/prunable.go index 9e4396539..680c1f0e6 100644 --- a/pkg/storage/prunable/prunable.go +++ b/pkg/storage/prunable/prunable.go @@ -41,10 +41,10 @@ func New(dbConfig database.Config, apiProvider iotago.APIProvider, errorHandler semiPermanentDBConfig: semiPermanentDBConfig, semiPermanentDB: semiPermanentDB, - decidedUpgradeSignals: epochstore.NewStore(kvstore.Realm{epochPrefixDecidedUpgradeSignals}, kvstore.Realm{lastPrunedEpochKey}, semiPermanentDB.KVStore(), pruningDelayDecidedUpgradeSignals, model.VersionAndHash.Bytes, model.VersionAndHashFromBytes), - poolRewards: epochstore.NewEpochKVStore(kvstore.Realm{epochPrefixPoolRewards}, kvstore.Realm{lastPrunedEpochKey}, semiPermanentDB.KVStore(), pruningDelayPoolRewards), - poolStats: epochstore.NewStore(kvstore.Realm{epochPrefixPoolStats}, kvstore.Realm{lastPrunedEpochKey}, semiPermanentDB.KVStore(), pruningDelayPoolStats, (*model.PoolsStats).Bytes, model.PoolsStatsFromBytes), - committee: epochstore.NewStore(kvstore.Realm{epochPrefixCommittee}, kvstore.Realm{lastPrunedEpochKey}, semiPermanentDB.KVStore(), pruningDelayCommittee, (*account.Accounts).Bytes, account.AccountsFromBytes), + decidedUpgradeSignals: epochstore.NewStore(kvstore.Realm{epochPrefixDecidedUpgradeSignals}, semiPermanentDB.KVStore(), pruningDelayDecidedUpgradeSignals, model.VersionAndHash.Bytes, model.VersionAndHashFromBytes), + poolRewards: epochstore.NewEpochKVStore(kvstore.Realm{epochPrefixPoolRewards}, semiPermanentDB.KVStore(), pruningDelayPoolRewards), + poolStats: epochstore.NewStore(kvstore.Realm{epochPrefixPoolStats}, semiPermanentDB.KVStore(), pruningDelayPoolStats, (*model.PoolsStats).Bytes, model.PoolsStatsFromBytes), + committee: epochstore.NewStore(kvstore.Realm{epochPrefixCommittee}, semiPermanentDB.KVStore(), pruningDelayCommittee, (*account.Accounts).Bytes, account.AccountsFromBytes), } } @@ -142,58 +142,64 @@ func (p *Prunable) Flush() { } } -func (p *Prunable) Rollback(targetSlot iotago.SlotIndex) error { - timeProvider := p.apiProvider.APIForSlot(targetSlot).TimeProvider() - targetSlotEpoch := timeProvider.EpochFromSlot(targetSlot) - lastCommittedEpoch := targetSlotEpoch - // if the target index is the last slot of the epoch, the epoch was committed - if timeProvider.EpochEnd(targetSlotEpoch) != targetSlot { - lastCommittedEpoch-- +func (p *Prunable) Rollback(targetEpoch iotago.EpochIndex, pruningRange [2]iotago.SlotIndex) error { + if err := p.prunableSlotStore.PruneSlots(targetEpoch, pruningRange); err != nil { + return ierrors.Wrapf(err, "failed to prune slots in range [%d, %d] from target epoch %d", pruningRange[0], pruningRange[1], targetEpoch) } - if err := p.prunableSlotStore.RollbackBucket(targetSlotEpoch, targetSlot, timeProvider.EpochEnd(targetSlotEpoch)); err != nil { - return ierrors.Wrapf(err, "error while rolling back slots in a bucket for epoch %d", targetSlotEpoch) + if err := p.rollbackCommitteesCandidates(targetEpoch, pruningRange[0]-1); err != nil { + return ierrors.Wrapf(err, "failed to rollback committee candidates to target epoch %d", targetEpoch) } - if err := p.rollbackCommitteesCandidates(targetSlotEpoch, targetSlot); err != nil { - return ierrors.Wrapf(err, "error while rolling back committee for epoch %d", targetSlotEpoch) + lastPrunedCommitteeEpoch, err := p.rollbackCommitteeEpochs(targetEpoch+1, pruningRange[0]-1) + if err != nil { + return ierrors.Wrapf(err, "failed to rollback committee epochs to target epoch %d", targetEpoch) } - // Shut down the prunableSlotStore in order to flush and get consistent state on disk after reopening. - p.prunableSlotStore.Shutdown() + lastPrunedPoolStatsEpoch, err := p.poolStats.RollbackEpochs(targetEpoch) + if err != nil { + return ierrors.Wrapf(err, "failed to rollback pool stats epochs to target epoch %d", targetEpoch) + } - // Removed entries that belong to the old fork and cannot be re-used. - for epoch := lastCommittedEpoch + 1; ; epoch++ { - if epoch > targetSlotEpoch { - shouldRollback, err := p.shouldRollbackCommittee(epoch, targetSlot) - if err != nil { - return ierrors.Wrapf(err, "error while checking if committee for epoch %d should be rolled back", epoch) - } + lastPrunedDecidedUpgradeSignalsEpoch, err := p.decidedUpgradeSignals.RollbackEpochs(targetEpoch) + if err != nil { + return ierrors.Wrapf(err, "failed to rollback decided upgrade signals epochs to target epoch %d", targetEpoch) + } - if shouldRollback { - if err := p.committee.DeleteEpoch(epoch); err != nil { - return ierrors.Wrapf(err, "error while deleting committee for epoch %d", epoch) - } - } + lastPrunedPoolRewardsEpoch, err := p.poolRewards.RollbackEpochs(targetEpoch) + if err != nil { + return ierrors.Wrapf(err, "failed to rollback pool rewards epochs to target epoch %d", targetEpoch) + } - if deleted := p.prunableSlotStore.DeleteBucket(epoch); !deleted { - break - } - } + for epochToPrune := targetEpoch + 1; epochToPrune <= max( + lastPrunedCommitteeEpoch, + lastPrunedPoolStatsEpoch, + lastPrunedDecidedUpgradeSignalsEpoch, + lastPrunedPoolRewardsEpoch, + ); epochToPrune++ { + p.prunableSlotStore.DeleteBucket(epochToPrune) + } - if err := p.poolRewards.DeleteEpoch(epoch); err != nil { - return ierrors.Wrapf(err, "error while deleting pool rewards for epoch %d", epoch) - } - if err := p.poolStats.DeleteEpoch(epoch); err != nil { - return ierrors.Wrapf(err, "error while deleting pool stats for epoch %d", epoch) - } + return nil +} + +func (p *Prunable) rollbackCommitteeEpochs(epoch iotago.EpochIndex, targetSlot iotago.SlotIndex) (lastPrunedEpoch iotago.EpochIndex, err error) { + lastAccessedEpoch, err := p.committee.LastAccessedEpoch() + if err != nil { + return lastAccessedEpoch, ierrors.Wrap(err, "failed to get last accessed committee epoch") + } - if err := p.decidedUpgradeSignals.DeleteEpoch(epoch); err != nil { - return ierrors.Wrapf(err, "error while deleting decided upgrade signals for epoch %d", epoch) + for epochToPrune := epoch; epochToPrune <= lastAccessedEpoch; epochToPrune++ { + if shouldRollback, rollbackErr := p.shouldRollbackCommittee(epochToPrune, targetSlot); rollbackErr != nil { + return epochToPrune, ierrors.Wrapf(rollbackErr, "error while checking if committee for epoch %d should be rolled back", epochToPrune) + } else if shouldRollback { + if err = p.committee.DeleteEpoch(epochToPrune); err != nil { + return epochToPrune, ierrors.Wrapf(err, "error while deleting committee for epoch %d", epochToPrune) + } } } - return nil + return lastAccessedEpoch, nil } // Remove committee for the next epoch only if forking point is before point of no return and committee is reused. diff --git a/pkg/storage/prunable/prunable_epoch.go b/pkg/storage/prunable/prunable_epoch.go index f09d3d075..0c2186dd0 100644 --- a/pkg/storage/prunable/prunable_epoch.go +++ b/pkg/storage/prunable/prunable_epoch.go @@ -13,7 +13,6 @@ const ( epochPrefixPoolRewards epochPrefixPoolStats epochPrefixCommittee - lastPrunedEpochKey ) const ( diff --git a/pkg/storage/storage_prunable.go b/pkg/storage/storage_prunable.go index f49f3b4dd..0760d6591 100644 --- a/pkg/storage/storage_prunable.go +++ b/pkg/storage/storage_prunable.go @@ -44,50 +44,72 @@ func (s *Storage) Blocks(slot iotago.SlotIndex) (*slotstore.Blocks, error) { // Reset resets the component to a clean state as if it was created at the last commitment. func (s *Storage) Reset() { - s.lastAccessedBlocks.Compute(func(lastAccessedBlocks iotago.SlotIndex) iotago.SlotIndex { - latestCommittedSlot := s.Settings().LatestCommitment().Slot() - - for slot := latestCommittedSlot + 1; slot <= lastAccessedBlocks; slot++ { - if blocksForSlot, err := s.prunable.Blocks(slot); err != nil { - s.errorHandler(ierrors.Wrapf(err, "failed to clear blocks at slot %d", slot)) - } else if err = blocksForSlot.Clear(); err != nil { - s.errorHandler(ierrors.Wrapf(err, "failed to clear blocks at slot %d", slot)) - } - } - - return latestCommittedSlot - }) + if err := s.Rollback(s.Settings().LatestCommitment().Slot()); err != nil { + s.errorHandler(ierrors.Wrap(err, "failed to reset prunable storage")) + } } func (s *Storage) RootBlocks(slot iotago.SlotIndex) (*slotstore.Store[iotago.BlockID, iotago.CommitmentID], error) { + if err := s.permanent.Settings().AdvanceLatestStoredSlot(slot); err != nil { + return nil, ierrors.Wrap(err, "failed to advance latest stored slot when accessing root blocks") + } + return s.prunable.RootBlocks(slot) } func (s *Storage) Mutations(slot iotago.SlotIndex) (kvstore.KVStore, error) { + if err := s.permanent.Settings().AdvanceLatestStoredSlot(slot); err != nil { + return nil, ierrors.Wrap(err, "failed to advance latest stored slot when accessing mutations") + } + return s.prunable.Mutations(slot) } func (s *Storage) Attestations(slot iotago.SlotIndex) (kvstore.KVStore, error) { + if err := s.permanent.Settings().AdvanceLatestStoredSlot(slot); err != nil { + return nil, ierrors.Wrap(err, "failed to advance latest stored slot when accessing attestations") + } + return s.prunable.Attestations(slot) } func (s *Storage) AccountDiffs(slot iotago.SlotIndex) (*slotstore.AccountDiffs, error) { + if err := s.permanent.Settings().AdvanceLatestStoredSlot(slot); err != nil { + return nil, ierrors.Wrap(err, "failed to advance latest stored slot when accessing account diffs") + } + return s.prunable.AccountDiffs(slot) } func (s *Storage) ValidatorPerformances(slot iotago.SlotIndex) (*slotstore.Store[iotago.AccountID, *model.ValidatorPerformance], error) { + if err := s.permanent.Settings().AdvanceLatestStoredSlot(slot); err != nil { + return nil, ierrors.Wrap(err, "failed to advance latest stored slot when accessing validator performances") + } + return s.prunable.ValidatorPerformances(slot) } func (s *Storage) UpgradeSignals(slot iotago.SlotIndex) (*slotstore.Store[account.SeatIndex, *model.SignaledBlock], error) { + if err := s.permanent.Settings().AdvanceLatestStoredSlot(slot); err != nil { + return nil, ierrors.Wrap(err, "failed to advance latest stored slot when accessing upgrade signals") + } + return s.prunable.UpgradeSignals(slot) } func (s *Storage) Roots(slot iotago.SlotIndex) (*slotstore.Store[iotago.CommitmentID, *iotago.Roots], error) { + if err := s.permanent.Settings().AdvanceLatestStoredSlot(slot); err != nil { + return nil, ierrors.Wrap(err, "failed to advance latest stored slot when accessing roots") + } + return s.prunable.Roots(slot) } func (s *Storage) Retainer(slot iotago.SlotIndex) (*slotstore.Retainer, error) { + if err := s.permanent.Settings().AdvanceLatestStoredSlot(slot); err != nil { + return nil, ierrors.Wrap(err, "failed to advance latest stored slot when accessing retainer") + } + return s.prunable.Retainer(slot) } @@ -104,6 +126,22 @@ func (s *Storage) RestoreFromDisk() { s.lastPrunedEpoch.MarkEvicted(lastPrunedEpoch) } -func (s *Storage) RollbackPrunable(targetIndex iotago.SlotIndex) error { - return s.prunable.Rollback(targetIndex) +func (s *Storage) Rollback(targetSlot iotago.SlotIndex) error { + if err := s.prunable.Rollback(s.pruningRange(targetSlot)); err != nil { + return ierrors.Wrapf(err, "failed to rollback prunable storage to slot %d", targetSlot) + } + + return nil +} + +func (s *Storage) pruningRange(targetSlot iotago.SlotIndex) (targetEpoch iotago.EpochIndex, pruneRange [2]iotago.SlotIndex) { + epochOfSlot := func(slot iotago.SlotIndex) iotago.EpochIndex { + return s.Settings().APIProvider().APIForSlot(slot).TimeProvider().EpochFromSlot(slot) + } + + if targetEpoch, pruneRange = epochOfSlot(targetSlot), [2]iotago.SlotIndex{targetSlot + 1, s.Settings().LatestStoredSlot()}; epochOfSlot(pruneRange[0]) > targetEpoch { + pruneRange[1] = s.Settings().APIProvider().APIForEpoch(targetEpoch).TimeProvider().EpochEnd(targetEpoch) + } + + return targetEpoch, pruneRange } From 3c0d36239cd5daf24f9e82dd289406a8c535145e Mon Sep 17 00:00:00 2001 From: Hans Moog <3293976+hmoog@users.noreply.github.com> Date: Mon, 6 Nov 2023 21:50:18 +0100 Subject: [PATCH 02/14] Fix: addressed review comment --- .../engine/accounts/accountsledger/manager.go | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/pkg/protocol/engine/accounts/accountsledger/manager.go b/pkg/protocol/engine/accounts/accountsledger/manager.go index 814b27c57..da04cd2dc 100644 --- a/pkg/protocol/engine/accounts/accountsledger/manager.go +++ b/pkg/protocol/engine/accounts/accountsledger/manager.go @@ -357,29 +357,19 @@ func (m *Manager) AddAccount(output *utxoledger.Output, blockIssuanceCredits iot // Reset resets the component to a clean state as if it was created at the last commitment. func (m *Manager) Reset() { - blockBurnsToDelete := make([]iotago.SlotIndex, 0) m.blockBurns.ForEachKey(func(slot iotago.SlotIndex) bool { if slot > m.latestCommittedSlot { - blockBurnsToDelete = append(blockBurnsToDelete, slot) + m.blockBurns.Delete(slot) } return true }) - versionSignalsToDelete := make([]iotago.SlotIndex, 0) m.latestSupportedVersionSignals.ForEach(func(slot iotago.SlotIndex, _ *shrinkingmap.ShrinkingMap[iotago.AccountID, *model.SignaledBlock]) { if slot > m.latestCommittedSlot { - versionSignalsToDelete = append(versionSignalsToDelete, slot) + m.latestSupportedVersionSignals.Evict(slot) } }) - - for _, slot := range blockBurnsToDelete { - m.blockBurns.Delete(slot) - } - - for _, slot := range versionSignalsToDelete { - m.latestSupportedVersionSignals.Evict(slot) - } } func (m *Manager) rollbackAccountTo(accountData *accounts.AccountData, targetSlot iotago.SlotIndex) (wasDestroyed bool, err error) { From fe439ebdec6cdee2df7ea3886bb0a40acefe4027 Mon Sep 17 00:00:00 2001 From: Hans Moog <3293976+hmoog@users.noreply.github.com> Date: Mon, 6 Nov 2023 21:51:40 +0100 Subject: [PATCH 03/14] Fix: addressed another reviewcomment --- .../engine/attestation/slotattestation/manager.go | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/pkg/protocol/engine/attestation/slotattestation/manager.go b/pkg/protocol/engine/attestation/slotattestation/manager.go index ac69e3871..42b90ed65 100644 --- a/pkg/protocol/engine/attestation/slotattestation/manager.go +++ b/pkg/protocol/engine/attestation/slotattestation/manager.go @@ -322,27 +322,17 @@ func (m *Manager) Rollback(targetSlot iotago.SlotIndex) error { // Reset resets the component to a clean state as if it was created at the last commitment. func (m *Manager) Reset() { - futureAttestationsToClear := make([]iotago.SlotIndex, 0) m.futureAttestations.ForEach(func(slot iotago.SlotIndex, _ *shrinkingmap.ShrinkingMap[iotago.AccountID, *iotago.Attestation]) { if slot > m.lastCommittedSlot { - futureAttestationsToClear = append(futureAttestationsToClear, slot) + m.futureAttestations.Evict(slot) } }) - pendingAttestationsToClear := make([]iotago.SlotIndex, 0) m.pendingAttestations.ForEach(func(slot iotago.SlotIndex, _ *shrinkingmap.ShrinkingMap[iotago.AccountID, *iotago.Attestation]) { if slot > m.lastCommittedSlot { - pendingAttestationsToClear = append(pendingAttestationsToClear, slot) + m.pendingAttestations.Evict(slot) } }) - - for _, slot := range futureAttestationsToClear { - m.futureAttestations.Evict(slot) - } - - for _, slot := range pendingAttestationsToClear { - m.pendingAttestations.Evict(slot) - } } func (m *Manager) computeAttestationCommitmentOffset(slot iotago.SlotIndex) (cutoffSlot iotago.SlotIndex, isValid bool) { From a2eafb382b247cd490ad00616246839c4ee4e9e8 Mon Sep 17 00:00:00 2001 From: Hans Moog <3293976+hmoog@users.noreply.github.com> Date: Mon, 6 Nov 2023 21:59:26 +0100 Subject: [PATCH 04/14] Feat: started adding mutexes --- pkg/protocol/engine/attestation/slotattestation/manager.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/protocol/engine/attestation/slotattestation/manager.go b/pkg/protocol/engine/attestation/slotattestation/manager.go index 42b90ed65..18b484373 100644 --- a/pkg/protocol/engine/attestation/slotattestation/manager.go +++ b/pkg/protocol/engine/attestation/slotattestation/manager.go @@ -322,6 +322,9 @@ func (m *Manager) Rollback(targetSlot iotago.SlotIndex) error { // Reset resets the component to a clean state as if it was created at the last commitment. func (m *Manager) Reset() { + m.commitmentMutex.Lock() + defer m.commitmentMutex.Unlock() + m.futureAttestations.ForEach(func(slot iotago.SlotIndex, _ *shrinkingmap.ShrinkingMap[iotago.AccountID, *iotago.Attestation]) { if slot > m.lastCommittedSlot { m.futureAttestations.Evict(slot) From f31eba48a450b32079fdebd0a8af8c7fe2d5fa70 Mon Sep 17 00:00:00 2001 From: Hans Moog <3293976+hmoog@users.noreply.github.com> Date: Mon, 6 Nov 2023 22:17:12 +0100 Subject: [PATCH 05/14] Feat: addressed review comment --- pkg/core/buffer/unsolid_commitment_buffer.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/core/buffer/unsolid_commitment_buffer.go b/pkg/core/buffer/unsolid_commitment_buffer.go index 100205648..27df0dd51 100644 --- a/pkg/core/buffer/unsolid_commitment_buffer.go +++ b/pkg/core/buffer/unsolid_commitment_buffer.go @@ -139,6 +139,9 @@ func (u *UnsolidCommitmentBuffer[V]) GetValuesAndEvict(commitmentID iotago.Commi // Reset resets the component to a clean state as if it was created at the last commitment. func (u *UnsolidCommitmentBuffer[V]) Reset() { + u.mutex.Lock() + defer u.mutex.Unlock() + u.blockBuffers.Clear() u.commitmentBuffer.Each(func(key iotago.CommitmentID, _ types.Empty) { u.commitmentBuffer.Remove(key) }) } From b504c90d1eeb5073b0638a767c03f6236b5104a9 Mon Sep 17 00:00:00 2001 From: Hans Moog <3293976+hmoog@users.noreply.github.com> Date: Mon, 6 Nov 2023 22:19:57 +0100 Subject: [PATCH 06/14] Feat: addressed another comment --- .../engine/accounts/accountsledger/manager.go | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/pkg/protocol/engine/accounts/accountsledger/manager.go b/pkg/protocol/engine/accounts/accountsledger/manager.go index da04cd2dc..31575ef32 100644 --- a/pkg/protocol/engine/accounts/accountsledger/manager.go +++ b/pkg/protocol/engine/accounts/accountsledger/manager.go @@ -357,19 +357,11 @@ func (m *Manager) AddAccount(output *utxoledger.Output, blockIssuanceCredits iot // Reset resets the component to a clean state as if it was created at the last commitment. func (m *Manager) Reset() { - m.blockBurns.ForEachKey(func(slot iotago.SlotIndex) bool { - if slot > m.latestCommittedSlot { - m.blockBurns.Delete(slot) - } - - return true - }) + m.mutex.Lock() + defer m.mutex.Unlock() - m.latestSupportedVersionSignals.ForEach(func(slot iotago.SlotIndex, _ *shrinkingmap.ShrinkingMap[iotago.AccountID, *model.SignaledBlock]) { - if slot > m.latestCommittedSlot { - m.latestSupportedVersionSignals.Evict(slot) - } - }) + m.blockBurns.Clear() + m.latestSupportedVersionSignals.Clear() } func (m *Manager) rollbackAccountTo(accountData *accounts.AccountData, targetSlot iotago.SlotIndex) (wasDestroyed bool, err error) { From 3c093f0c4847973038ab6b895238b503fc4ef8cb Mon Sep 17 00:00:00 2001 From: Hans Moog <3293976+hmoog@users.noreply.github.com> Date: Mon, 6 Nov 2023 22:21:58 +0100 Subject: [PATCH 07/14] Refactor: addressed another comment --- pkg/protocol/engine/attestation/slotattestation/manager.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/pkg/protocol/engine/attestation/slotattestation/manager.go b/pkg/protocol/engine/attestation/slotattestation/manager.go index 18b484373..60dbf6603 100644 --- a/pkg/protocol/engine/attestation/slotattestation/manager.go +++ b/pkg/protocol/engine/attestation/slotattestation/manager.go @@ -325,11 +325,7 @@ func (m *Manager) Reset() { m.commitmentMutex.Lock() defer m.commitmentMutex.Unlock() - m.futureAttestations.ForEach(func(slot iotago.SlotIndex, _ *shrinkingmap.ShrinkingMap[iotago.AccountID, *iotago.Attestation]) { - if slot > m.lastCommittedSlot { - m.futureAttestations.Evict(slot) - } - }) + m.futureAttestations.Clear() m.pendingAttestations.ForEach(func(slot iotago.SlotIndex, _ *shrinkingmap.ShrinkingMap[iotago.AccountID, *iotago.Attestation]) { if slot > m.lastCommittedSlot { From 2599cb2d0a83bfd8d42cdfade30120a59f0dc19b Mon Sep 17 00:00:00 2001 From: Hans Moog <3293976+hmoog@users.noreply.github.com> Date: Mon, 6 Nov 2023 22:30:54 +0100 Subject: [PATCH 08/14] Fix: fixed deadlock --- pkg/protocol/engine/attestation/slotattestation/manager.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pkg/protocol/engine/attestation/slotattestation/manager.go b/pkg/protocol/engine/attestation/slotattestation/manager.go index 60dbf6603..25c5196a7 100644 --- a/pkg/protocol/engine/attestation/slotattestation/manager.go +++ b/pkg/protocol/engine/attestation/slotattestation/manager.go @@ -327,11 +327,16 @@ func (m *Manager) Reset() { m.futureAttestations.Clear() + pendingAttestationsToClear := make([]iotago.SlotIndex, 0) m.pendingAttestations.ForEach(func(slot iotago.SlotIndex, _ *shrinkingmap.ShrinkingMap[iotago.AccountID, *iotago.Attestation]) { if slot > m.lastCommittedSlot { - m.pendingAttestations.Evict(slot) + pendingAttestationsToClear = append(pendingAttestationsToClear, slot) } }) + + for _, slot := range pendingAttestationsToClear { + m.pendingAttestations.Evict(slot) + } } func (m *Manager) computeAttestationCommitmentOffset(slot iotago.SlotIndex) (cutoffSlot iotago.SlotIndex, isValid bool) { From 7b131303d7722c9142394bf1f205073176bd80ff Mon Sep 17 00:00:00 2001 From: Hans Moog <3293976+hmoog@users.noreply.github.com> Date: Mon, 6 Nov 2023 22:50:42 +0100 Subject: [PATCH 09/14] Refactor: addressed comments --- pkg/protocol/engine/booker/inmemorybooker/booker.go | 2 +- .../engine/commitmentfilter/accountsfilter/commitmentfilter.go | 2 +- .../engine/congestioncontrol/scheduler/passthrough/scheduler.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/protocol/engine/booker/inmemorybooker/booker.go b/pkg/protocol/engine/booker/inmemorybooker/booker.go index 51f53c52d..817b1c2cb 100644 --- a/pkg/protocol/engine/booker/inmemorybooker/booker.go +++ b/pkg/protocol/engine/booker/inmemorybooker/booker.go @@ -117,7 +117,7 @@ func (b *Booker) Queue(block *blocks.Block) error { } // Reset resets the component to a clean state as if it was created at the last commitment. -func (b *Booker) Reset() {} +func (b *Booker) Reset() { /* nothing to reset but comply with interface */ } func (b *Booker) Shutdown() { b.TriggerStopped() diff --git a/pkg/protocol/engine/commitmentfilter/accountsfilter/commitmentfilter.go b/pkg/protocol/engine/commitmentfilter/accountsfilter/commitmentfilter.go index 589cbf23b..842f06c61 100644 --- a/pkg/protocol/engine/commitmentfilter/accountsfilter/commitmentfilter.go +++ b/pkg/protocol/engine/commitmentfilter/accountsfilter/commitmentfilter.go @@ -191,7 +191,7 @@ func (c *CommitmentFilter) evaluateBlock(block *blocks.Block) { } // Reset resets the component to a clean state as if it was created at the last commitment. -func (c *CommitmentFilter) Reset() {} +func (c *CommitmentFilter) Reset() { /* nothing to reset but comply with interface */ } func (c *CommitmentFilter) Shutdown() { c.TriggerStopped() diff --git a/pkg/protocol/engine/congestioncontrol/scheduler/passthrough/scheduler.go b/pkg/protocol/engine/congestioncontrol/scheduler/passthrough/scheduler.go index 1f46884b3..f85b9fd21 100644 --- a/pkg/protocol/engine/congestioncontrol/scheduler/passthrough/scheduler.go +++ b/pkg/protocol/engine/congestioncontrol/scheduler/passthrough/scheduler.go @@ -73,4 +73,4 @@ func (s *Scheduler) AddBlock(block *blocks.Block) { } // Reset resets the component to a clean state as if it was created at the last commitment. -func (s *Scheduler) Reset() {} +func (s *Scheduler) Reset() { /* nothing to reset but comply with interface */ } From 40a94e70a0676155cb691f9d1dee676cf444afb6 Mon Sep 17 00:00:00 2001 From: Hans Moog <3293976+hmoog@users.noreply.github.com> Date: Tue, 7 Nov 2023 01:29:58 +0100 Subject: [PATCH 10/14] Refactor: addressed review comments --- pkg/protocol/engine/eviction/state.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/pkg/protocol/engine/eviction/state.go b/pkg/protocol/engine/eviction/state.go index da83bff46..7c263e685 100644 --- a/pkg/protocol/engine/eviction/state.go +++ b/pkg/protocol/engine/eviction/state.go @@ -325,20 +325,18 @@ func (s *State) PopulateFromStorage(latestCommitmentSlot iotago.SlotIndex) { } func (s *State) Reset() { - rootBlocksToClear := make([]iotago.BlockID, 0) + s.evictionMutex.Lock() + defer s.evictionMutex.Unlock() + s.rootBlocks.ForEach(func(slot iotago.SlotIndex, storage *shrinkingmap.ShrinkingMap[iotago.BlockID, iotago.CommitmentID]) { if slot > s.lastEvictedSlot { storage.ForEach(func(blockID iotago.BlockID, commitmentID iotago.CommitmentID) bool { - rootBlocksToClear = append(rootBlocksToClear, blockID) + s.RemoveRootBlock(blockID) return true }) } }) - - for _, blockID := range rootBlocksToClear { - s.RemoveRootBlock(blockID) - } } // latestNonEmptySlot returns the latest slot that contains a rootblock. From 0167b635bdb45338cabb0cf162e1598c6e2f0a49 Mon Sep 17 00:00:00 2001 From: Hans Moog <3293976+hmoog@users.noreply.github.com> Date: Tue, 7 Nov 2023 01:34:30 +0100 Subject: [PATCH 11/14] Refactor: addressed comment --- pkg/protocol/engine/filter/blockfilter/filter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/protocol/engine/filter/blockfilter/filter.go b/pkg/protocol/engine/filter/blockfilter/filter.go index 21c62f4cf..daf8cac3b 100644 --- a/pkg/protocol/engine/filter/blockfilter/filter.go +++ b/pkg/protocol/engine/filter/blockfilter/filter.go @@ -118,7 +118,7 @@ func (f *Filter) ProcessReceivedBlock(block *model.Block, source peer.ID) { } // Reset resets the component to a clean state as if it was created at the last commitment. -func (f *Filter) Reset() {} +func (f *Filter) Reset() { /* nothing to reset but comply with interface */ } func (f *Filter) Shutdown() { f.TriggerStopped() From 61723630217e2320864569fa19a1936f1fc4172e Mon Sep 17 00:00:00 2001 From: Hans Moog <3293976+hmoog@users.noreply.github.com> Date: Tue, 7 Nov 2023 01:45:49 +0100 Subject: [PATCH 12/14] Fix: fixed deadlock --- pkg/protocol/engine/eviction/state.go | 3 --- pkg/protocol/engine/mempool/v1/mempool.go | 15 +++++---------- 2 files changed, 5 insertions(+), 13 deletions(-) diff --git a/pkg/protocol/engine/eviction/state.go b/pkg/protocol/engine/eviction/state.go index 7c263e685..3f912f0d0 100644 --- a/pkg/protocol/engine/eviction/state.go +++ b/pkg/protocol/engine/eviction/state.go @@ -325,9 +325,6 @@ func (s *State) PopulateFromStorage(latestCommitmentSlot iotago.SlotIndex) { } func (s *State) Reset() { - s.evictionMutex.Lock() - defer s.evictionMutex.Unlock() - s.rootBlocks.ForEach(func(slot iotago.SlotIndex, storage *shrinkingmap.ShrinkingMap[iotago.BlockID, iotago.CommitmentID]) { if slot > s.lastEvictedSlot { storage.ForEach(func(blockID iotago.BlockID, commitmentID iotago.CommitmentID) bool { diff --git a/pkg/protocol/engine/mempool/v1/mempool.go b/pkg/protocol/engine/mempool/v1/mempool.go index 096e7fb0d..52935d6ea 100644 --- a/pkg/protocol/engine/mempool/v1/mempool.go +++ b/pkg/protocol/engine/mempool/v1/mempool.go @@ -199,10 +199,13 @@ func (m *MemPool[VoteRank]) stateDiff(slot iotago.SlotIndex) (*StateDiff, error) // Reset resets the component to a clean state as if it was created at the last commitment. func (m *MemPool[VoteRank]) Reset() { - stateDiffsToDelete := make([]iotago.SlotIndex, 0) m.stateDiffs.ForEachKey(func(slot iotago.SlotIndex) bool { if slot > m.lastEvictedSlot { - stateDiffsToDelete = append(stateDiffsToDelete, slot) + if stateDiff, deleted := m.stateDiffs.DeleteAndReturn(slot); deleted { + if err := stateDiff.Reset(); err != nil { + m.errorHandler(ierrors.Wrapf(err, "failed to reset state diff for slot %d", slot)) + } + } } return true @@ -215,14 +218,6 @@ func (m *MemPool[VoteRank]) Reset() { } }) - for _, slot := range stateDiffsToDelete { - if stateDiff, deleted := m.stateDiffs.DeleteAndReturn(slot); deleted { - if err := stateDiff.Reset(); err != nil { - m.errorHandler(ierrors.Wrapf(err, "failed to reset state diff for slot %d", slot)) - } - } - } - for _, slot := range attachmentsToDelete { if evictedAttachments := m.attachments.Evict(slot); evictedAttachments != nil { evictedAttachments.ForEach(func(id iotago.BlockID, metadata *SignedTransactionMetadata) bool { From 8db794c24bee736b2a702c075fcccf426f03bdf8 Mon Sep 17 00:00:00 2001 From: Hans Moog <3293976+hmoog@users.noreply.github.com> Date: Tue, 7 Nov 2023 02:16:51 +0100 Subject: [PATCH 13/14] Refactor: fixed deadlock --- pkg/protocol/engine/eviction/state.go | 10 +++++----- .../notarization/slotnotarization/slotmutations.go | 13 +------------ 2 files changed, 6 insertions(+), 17 deletions(-) diff --git a/pkg/protocol/engine/eviction/state.go b/pkg/protocol/engine/eviction/state.go index 3f912f0d0..c80699ca9 100644 --- a/pkg/protocol/engine/eviction/state.go +++ b/pkg/protocol/engine/eviction/state.go @@ -325,15 +325,15 @@ func (s *State) PopulateFromStorage(latestCommitmentSlot iotago.SlotIndex) { } func (s *State) Reset() { + blocksToPrune := make([]iotago.BlockID, 0) s.rootBlocks.ForEach(func(slot iotago.SlotIndex, storage *shrinkingmap.ShrinkingMap[iotago.BlockID, iotago.CommitmentID]) { if slot > s.lastEvictedSlot { - storage.ForEach(func(blockID iotago.BlockID, commitmentID iotago.CommitmentID) bool { - s.RemoveRootBlock(blockID) - - return true - }) + blocksToPrune = append(blocksToPrune, storage.Keys()...) } }) + + // we need to prune delayed because s.rootBlocks.ForEach and s.RemoveRootBlock both lock + lo.ForEach(blocksToPrune, s.RemoveRootBlock) } // latestNonEmptySlot returns the latest slot that contains a rootblock. diff --git a/pkg/protocol/engine/notarization/slotnotarization/slotmutations.go b/pkg/protocol/engine/notarization/slotnotarization/slotmutations.go index 9e13a677a..d36d6fbe6 100644 --- a/pkg/protocol/engine/notarization/slotnotarization/slotmutations.go +++ b/pkg/protocol/engine/notarization/slotnotarization/slotmutations.go @@ -63,18 +63,7 @@ func (m *SlotMutations) Evict(index iotago.SlotIndex) error { // Reset resets the component to a clean state as if it was created at the last commitment. func (m *SlotMutations) Reset() { - slotsToReset := make([]iotago.SlotIndex, 0) - m.acceptedBlocksBySlot.ForEachKey(func(slot iotago.SlotIndex) bool { - if slot > m.latestCommittedIndex { - slotsToReset = append(slotsToReset, slot) - } - - return true - }) - - for _, slot := range slotsToReset { - m.acceptedBlocksBySlot.Delete(slot) - } + m.acceptedBlocksBySlot.Clear() } // AcceptedBlocks returns the set of accepted blocks for the given slot. From f4d229aaa96216b18c7a786b4f5b3f26f4e8fb2b Mon Sep 17 00:00:00 2001 From: Hans Moog <3293976+hmoog@users.noreply.github.com> Date: Tue, 7 Nov 2023 02:44:16 +0100 Subject: [PATCH 14/14] Refactor: cleaned code --- pkg/storage/storage_prunable.go | 6 +----- pkg/tests/loss_of_acceptance_test.go | 2 -- 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/pkg/storage/storage_prunable.go b/pkg/storage/storage_prunable.go index 0760d6591..3c437eb04 100644 --- a/pkg/storage/storage_prunable.go +++ b/pkg/storage/storage_prunable.go @@ -127,11 +127,7 @@ func (s *Storage) RestoreFromDisk() { } func (s *Storage) Rollback(targetSlot iotago.SlotIndex) error { - if err := s.prunable.Rollback(s.pruningRange(targetSlot)); err != nil { - return ierrors.Wrapf(err, "failed to rollback prunable storage to slot %d", targetSlot) - } - - return nil + return s.prunable.Rollback(s.pruningRange(targetSlot)) } func (s *Storage) pruningRange(targetSlot iotago.SlotIndex) (targetEpoch iotago.EpochIndex, pruneRange [2]iotago.SlotIndex) { diff --git a/pkg/tests/loss_of_acceptance_test.go b/pkg/tests/loss_of_acceptance_test.go index abf7be665..a8bb02537 100644 --- a/pkg/tests/loss_of_acceptance_test.go +++ b/pkg/tests/loss_of_acceptance_test.go @@ -87,8 +87,6 @@ func TestLossOfAcceptanceFromGenesis(t *testing.T) { { ts.IssueBlocksAtSlots("", []iotago.SlotIndex{58, 59}, 3, "57.2", ts.Nodes("node0", "node1", "node2"), true, nil) - time.Sleep(10 * time.Second) - ts.AssertEqualStoredCommitmentAtIndex(57, ts.Nodes()...) ts.AssertLatestCommitmentSlotIndex(57, ts.Nodes()...) ts.AssertBlocksInCacheAccepted(ts.BlocksWithPrefix("59.0"), true, ts.Nodes()...)