Skip to content

Commit

Permalink
Refactor: refactor reset logic
Browse files Browse the repository at this point in the history
  • Loading branch information
hmoog committed Nov 6, 2023
1 parent 99cc8c9 commit 097daf3
Show file tree
Hide file tree
Showing 13 changed files with 266 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func NewTestFramework(test *testing.T) *TestFramework {
T: test,
blocks: shrinkingmap.New[string, *blocks.Block](),

SeatManager: mock.NewManualPOA(api.SingleVersionProvider(tpkg.TestAPI), epochstore.NewStore(kvstore.Realm{}, kvstore.Realm{}, mapdb.NewMapDB(), 0, (*account.Accounts).Bytes, account.AccountsFromBytes)),
SeatManager: mock.NewManualPOA(api.SingleVersionProvider(tpkg.TestAPI), epochstore.NewStore(kvstore.Realm{}, mapdb.NewMapDB(), 0, (*account.Accounts).Bytes, account.AccountsFromBytes)),
}

evictionState := eviction.NewState(mapdb.NewMapDB(), func(slot iotago.SlotIndex) (*slotstore.Store[iotago.BlockID, iotago.CommitmentID], error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/protocol/enginemanager/enginemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func (e *EngineManager) rollbackStorage(newStorage *storage.Storage, slot iotago
return ierrors.Wrap(err, "failed to rollback settings")
}

if err := newStorage.RollbackPrunable(slot); err != nil {
if err := newStorage.Rollback(slot); err != nil {
return ierrors.Wrap(err, "failed to rollback prunable data")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
)

func TestTopStakers_InitializeCommittee(t *testing.T) {
committeeStore := epochstore.NewStore(kvstore.Realm{}, kvstore.Realm{}, mapdb.NewMapDB(), 0, (*account.Accounts).Bytes, account.AccountsFromBytes)
committeeStore := epochstore.NewStore(kvstore.Realm{}, mapdb.NewMapDB(), 0, (*account.Accounts).Bytes, account.AccountsFromBytes)

topStakersSeatManager := &SeatManager{
apiProvider: api.SingleVersionProvider(tpkg.TestAPI),
Expand Down Expand Up @@ -58,7 +58,7 @@ func TestTopStakers_InitializeCommittee(t *testing.T) {
}

func TestTopStakers_RotateCommittee(t *testing.T) {
committeeStore := epochstore.NewStore(kvstore.Realm{}, kvstore.Realm{}, mapdb.NewMapDB(), 0, (*account.Accounts).Bytes, account.AccountsFromBytes)
committeeStore := epochstore.NewStore(kvstore.Realm{}, mapdb.NewMapDB(), 0, (*account.Accounts).Bytes, account.AccountsFromBytes)

topStakersSeatManager := &SeatManager{
apiProvider: api.SingleVersionProvider(tpkg.TestAPI),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ func (t *TestSuite) InitPerformanceTracker() {
return p, nil
}

rewardsStore := epochstore.NewEpochKVStore(kvstore.Realm{}, kvstore.Realm{}, mapdb.NewMapDB(), 0)
poolStatsStore := epochstore.NewStore(kvstore.Realm{}, kvstore.Realm{}, mapdb.NewMapDB(), 0, (*model.PoolsStats).Bytes, model.PoolsStatsFromBytes)
committeeStore := epochstore.NewStore(kvstore.Realm{}, kvstore.Realm{}, mapdb.NewMapDB(), 0, (*account.Accounts).Bytes, account.AccountsFromBytes)
committeeCandidatesStore := epochstore.NewEpochKVStore(kvstore.Realm{}, kvstore.Realm{}, mapdb.NewMapDB(), 0)
rewardsStore := epochstore.NewEpochKVStore(kvstore.Realm{}, mapdb.NewMapDB(), 0)
poolStatsStore := epochstore.NewStore(kvstore.Realm{}, mapdb.NewMapDB(), 0, (*model.PoolsStats).Bytes, model.PoolsStatsFromBytes)
committeeStore := epochstore.NewStore(kvstore.Realm{}, mapdb.NewMapDB(), 0, (*account.Accounts).Bytes, account.AccountsFromBytes)
committeeCandidatesStore := epochstore.NewEpochKVStore(kvstore.Realm{}, mapdb.NewMapDB(), 0)

t.Instance = NewTracker(
rewardsStore.GetEpoch,
Expand Down
68 changes: 46 additions & 22 deletions pkg/storage/permanent/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const (
snapshotImportedKey = iota
latestCommitmentKey
latestFinalizedSlotKey
latestStoredSlotKey
protocolVersionEpochMappingKey
futureProtocolParametersKey
protocolParametersKey
Expand All @@ -34,6 +35,7 @@ type Settings struct {
storeSnapshotImported *kvstore.TypedValue[bool]
storeLatestCommitment *kvstore.TypedValue[*model.Commitment]
storeLatestFinalizedSlot *kvstore.TypedValue[iotago.SlotIndex]
storeLatestProcessedSlot *kvstore.TypedValue[iotago.SlotIndex]
storeLatestIssuedValidationBlock *kvstore.TypedValue[*model.Block]
storeProtocolVersionEpochMapping *kvstore.TypedStore[iotago.Version, iotago.EpochIndex]
storeFutureProtocolParameters *kvstore.TypedStore[iotago.Version, *types.Tuple[iotago.EpochIndex, iotago.Identifier]]
Expand Down Expand Up @@ -83,6 +85,12 @@ func NewSettings(store kvstore.KVStore, opts ...options.Option[api.EpochBasedPro
iotago.SlotIndex.Bytes,
iotago.SlotIndexFromBytes,
),
storeLatestProcessedSlot: kvstore.NewTypedValue(
store,
[]byte{latestStoredSlotKey},
iotago.SlotIndex.Bytes,
iotago.SlotIndexFromBytes,
),
storeLatestIssuedValidationBlock: kvstore.NewTypedValue(
store,
[]byte{latestIssuedValidationBlock},
Expand Down Expand Up @@ -285,7 +293,7 @@ func (s *Settings) LatestFinalizedSlot() iotago.SlotIndex {
s.mutex.RLock()
defer s.mutex.RUnlock()

return s.latestFinalizedSlot()
return read(s.storeLatestFinalizedSlot)
}

func (s *Settings) SetLatestFinalizedSlot(slot iotago.SlotIndex) (err error) {
Expand All @@ -295,23 +303,42 @@ func (s *Settings) SetLatestFinalizedSlot(slot iotago.SlotIndex) (err error) {
return s.storeLatestFinalizedSlot.Set(slot)
}

func (s *Settings) latestFinalizedSlot() iotago.SlotIndex {
latestFinalizedSlot, err := s.storeLatestFinalizedSlot.Get()
if err != nil {
if ierrors.Is(err, kvstore.ErrKeyNotFound) {
return 0
func (s *Settings) LatestStoredSlot() iotago.SlotIndex {
s.mutex.RLock()
defer s.mutex.RUnlock()

return read(s.storeLatestProcessedSlot)
}

func (s *Settings) SetLatestStoredSlot(slot iotago.SlotIndex) (err error) {
s.mutex.Lock()
defer s.mutex.Unlock()

return s.storeLatestProcessedSlot.Set(slot)
}

func (s *Settings) AdvanceLatestStoredSlot(slot iotago.SlotIndex) (err error) {
s.mutex.Lock()
defer s.mutex.Unlock()

if _, err = s.storeLatestProcessedSlot.Compute(func(latestStoredSlot iotago.SlotIndex, _ bool) (newValue iotago.SlotIndex, err error) {
if latestStoredSlot >= slot {
return latestStoredSlot, kvstore.ErrTypedValueNotChanged
}
panic(err)

return slot, nil
}); err != nil {
return ierrors.Wrap(err, "failed to advance latest stored slot")
}

return latestFinalizedSlot
return nil
}

func (s *Settings) LatestIssuedValidationBlock() *model.Block {
s.mutex.RLock()
defer s.mutex.RUnlock()

return s.latestIssuedValidationBlock()
return read(s.storeLatestIssuedValidationBlock)
}

func (s *Settings) SetLatestIssuedValidationBlock(block *model.Block) (err error) {
Expand All @@ -321,18 +348,6 @@ func (s *Settings) SetLatestIssuedValidationBlock(block *model.Block) (err error
return s.storeLatestIssuedValidationBlock.Set(block)
}

func (s *Settings) latestIssuedValidationBlock() *model.Block {
block, err := s.storeLatestIssuedValidationBlock.Get()
if err != nil {
if ierrors.Is(err, kvstore.ErrKeyNotFound) {
return nil
}
panic(err)
}

return block
}

func (s *Settings) Export(writer io.WriteSeeker, targetCommitment *iotago.Commitment) error {
var commitmentBytes []byte
var err error
Expand Down Expand Up @@ -579,7 +594,7 @@ func (s *Settings) String() string {
builder := stringify.NewStructBuilder("Settings")
builder.AddField(stringify.NewStructField("IsSnapshotImported", lo.PanicOnErr(s.storeSnapshotImported.Has())))
builder.AddField(stringify.NewStructField("LatestCommitment", s.latestCommitment()))
builder.AddField(stringify.NewStructField("LatestFinalizedSlot", s.latestFinalizedSlot()))
builder.AddField(stringify.NewStructField("LatestFinalizedSlot", read(s.storeLatestFinalizedSlot)))
if err := s.storeProtocolParameters.Iterate(kvstore.EmptyPrefix, func(version iotago.Version, protocolParams iotago.ProtocolParameters) bool {
builder.AddField(stringify.NewStructField(fmt.Sprintf("ProtocolParameters(%d)", version), protocolParams))

Expand All @@ -590,3 +605,12 @@ func (s *Settings) String() string {

return builder.String()
}

func read[T any](typedValue *kvstore.TypedValue[T]) (value T) {
value, err := typedValue.Get()
if err != nil && !ierrors.Is(err, kvstore.ErrKeyNotFound) {
panic(err)
}

return value
}
18 changes: 11 additions & 7 deletions pkg/storage/prunable/bucket_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,18 +236,22 @@ func (b *BucketManager) DeleteBucket(epoch iotago.EpochIndex) (deleted bool) {
return true
}

// RollbackBucket removes data in the bucket in slots [targetSlotIndex+1; epochEndSlot].
func (b *BucketManager) RollbackBucket(epoch iotago.EpochIndex, targetSlot, epochEndSlot iotago.SlotIndex) error {
oldBucketKvStore := b.getDBInstance(epoch).KVStore()
for clearSlot := targetSlot + 1; clearSlot <= epochEndSlot; clearSlot++ {
// delete slot prefix from forkedPrunable storage that will be eventually copied into the new engine
if err := oldBucketKvStore.DeletePrefix(clearSlot.MustBytes()); err != nil {
return ierrors.Wrapf(err, "error while clearing slot %d in bucket for epoch %d", clearSlot, epoch)
// PruneSlots prunes the data of all slots in the range [from, to] in the given epoch.
func (b *BucketManager) PruneSlots(epoch iotago.EpochIndex, pruningRange [2]iotago.SlotIndex) error {
epochStore := b.getDBInstance(epoch).KVStore()

for slot := pruningRange[0]; slot <= pruningRange[1]; slot++ {
if err := epochStore.DeletePrefix(slot.MustBytes()); err != nil {
return ierrors.Wrapf(err, "error while clearing slot %d in bucket for epoch %d", slot, epoch)
}
}

// shutting down the storage does not prevent this storage from being used again and only forces a flush.
b.Shutdown()

return nil
}

func (b *BucketManager) Flush() error {
b.openDBsMutex.RLock()
defer b.openDBsMutex.RUnlock()
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/prunable/epochstore/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package epochstore

const (
entriesKey byte = iota
lastAccessedEpochKey
lastPrunedEpochKey
)
49 changes: 43 additions & 6 deletions pkg/storage/prunable/epochstore/epoch_kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,18 @@ type EpochKVStore struct {
kv kvstore.KVStore
pruningDelay iotago.EpochIndex

lastPrunedEpoch *model.PruningIndex
lastAccessedEpoch *kvstore.TypedValue[iotago.EpochIndex]
lastPrunedEpoch *model.PruningIndex
}

func NewEpochKVStore(storeRealm, pruningRealm kvstore.Realm, kv kvstore.KVStore, pruningDelay iotago.EpochIndex) *EpochKVStore {
func NewEpochKVStore(storeRealm kvstore.Realm, kv kvstore.KVStore, pruningDelay iotago.EpochIndex) *EpochKVStore {

return &EpochKVStore{
realm: storeRealm,
kv: lo.PanicOnErr(kv.WithExtendedRealm(storeRealm)),
pruningDelay: pruningDelay,
lastPrunedEpoch: model.NewPruningIndex(lo.PanicOnErr(kv.WithExtendedRealm(pruningRealm)), storeRealm),
realm: storeRealm,
kv: lo.PanicOnErr(kv.WithExtendedRealm(append(storeRealm, entriesKey))),
pruningDelay: pruningDelay,
lastAccessedEpoch: kvstore.NewTypedValue(kv, append(storeRealm, lastAccessedEpochKey), iotago.EpochIndex.Bytes, iotago.EpochIndexFromBytes),
lastPrunedEpoch: model.NewPruningIndex(lo.PanicOnErr(kv.WithExtendedRealm(storeRealm)), kvstore.Realm{lastPrunedEpochKey}),
}
}

Expand All @@ -37,11 +39,31 @@ func (e *EpochKVStore) RestoreLastPrunedEpoch() error {
return e.lastPrunedEpoch.RestoreFromDisk()
}

func (e *EpochKVStore) LastAccessedEpoch() (lastAccessedEpoch iotago.EpochIndex, err error) {
if lastAccessedEpoch, err = e.lastAccessedEpoch.Get(); err != nil {
if !ierrors.Is(err, kvstore.ErrKeyNotFound) {
err = ierrors.Wrap(err, "failed to get last accessed epoch")
} else {
err = nil
}
}

return lastAccessedEpoch, err
}

func (e *EpochKVStore) LastPrunedEpoch() (iotago.EpochIndex, bool) {
return e.lastPrunedEpoch.Index()
}

func (e *EpochKVStore) GetEpoch(epoch iotago.EpochIndex) (kvstore.KVStore, error) {
_, _ = e.lastAccessedEpoch.Compute(func(lastAccessedEpoch iotago.EpochIndex, exists bool) (newValue iotago.EpochIndex, err error) {
if lastAccessedEpoch >= epoch {
return lastAccessedEpoch, kvstore.ErrTypedValueNotChanged
}

return epoch, nil
})

if e.isTooOld(epoch) {
return nil, ierrors.Wrapf(database.ErrEpochPruned, "epoch %d is too old", epoch)
}
Expand Down Expand Up @@ -81,3 +103,18 @@ func (e *EpochKVStore) Prune(epoch iotago.EpochIndex, defaultPruningDelay iotago

return nil
}

func (e *EpochKVStore) RollbackEpochs(epoch iotago.EpochIndex) (lastPrunedEpoch iotago.EpochIndex, err error) {
lastAccessedEpoch, err := e.LastAccessedEpoch()
if err != nil {
return lastAccessedEpoch, ierrors.Wrap(err, "failed to get last accessed epoch")
}

for epochToPrune := epoch; epochToPrune <= lastAccessedEpoch; epochToPrune++ {
if err = e.DeleteEpoch(epochToPrune); err != nil {
return epochToPrune, ierrors.Wrapf(err, "error while deleting epoch %d", epochToPrune)
}
}

return lastAccessedEpoch, nil
}
61 changes: 49 additions & 12 deletions pkg/storage/prunable/epochstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,36 @@ type Store[V any] struct {
kv *kvstore.TypedStore[iotago.EpochIndex, V]
pruningDelay iotago.EpochIndex

lastPrunedEpoch *model.PruningIndex
lastAccessedEpoch *kvstore.TypedValue[iotago.EpochIndex]
lastPrunedEpoch *model.PruningIndex
}

func NewStore[V any](storeRealm, pruningRealm kvstore.Realm, kv kvstore.KVStore, pruningDelay iotago.EpochIndex, vToBytes kvstore.ObjectToBytes[V], bytesToV kvstore.BytesToObject[V]) *Store[V] {
func NewStore[V any](storeRealm kvstore.Realm, kv kvstore.KVStore, pruningDelay iotago.EpochIndex, vToBytes kvstore.ObjectToBytes[V], bytesToV kvstore.BytesToObject[V]) *Store[V] {
return &Store[V]{
realm: storeRealm,
kv: kvstore.NewTypedStore(lo.PanicOnErr(kv.WithExtendedRealm(storeRealm)), iotago.EpochIndex.Bytes, iotago.EpochIndexFromBytes, vToBytes, bytesToV),
pruningDelay: pruningDelay,
lastPrunedEpoch: model.NewPruningIndex(lo.PanicOnErr(kv.WithExtendedRealm(pruningRealm)), storeRealm),
realm: storeRealm,
kv: kvstore.NewTypedStore(lo.PanicOnErr(kv.WithExtendedRealm(append(storeRealm, entriesKey))), iotago.EpochIndex.Bytes, iotago.EpochIndexFromBytes, vToBytes, bytesToV),
pruningDelay: pruningDelay,
lastAccessedEpoch: kvstore.NewTypedValue(kv, append(storeRealm, lastAccessedEpochKey), iotago.EpochIndex.Bytes, iotago.EpochIndexFromBytes),
lastPrunedEpoch: model.NewPruningIndex(lo.PanicOnErr(kv.WithExtendedRealm(storeRealm)), kvstore.Realm{lastPrunedEpochKey}),
}
}

func (s *Store[V]) isTooOld(epoch iotago.EpochIndex) bool {
prunedEpoch, hasPruned := s.lastPrunedEpoch.Index()

return hasPruned && epoch <= prunedEpoch
}

func (s *Store[V]) RestoreLastPrunedEpoch() error {
return s.lastPrunedEpoch.RestoreFromDisk()
}

func (s *Store[V]) LastAccessedEpoch() (lastAccessedEpoch iotago.EpochIndex, err error) {
if lastAccessedEpoch, err = s.lastAccessedEpoch.Get(); err != nil {
if !ierrors.Is(err, kvstore.ErrKeyNotFound) {
err = ierrors.Wrap(err, "failed to get last accessed epoch")
} else {
err = nil
}
}

return lastAccessedEpoch, err
}

func (s *Store[V]) LastPrunedEpoch() (iotago.EpochIndex, bool) {
return s.lastPrunedEpoch.Index()
}
Expand All @@ -61,6 +69,14 @@ func (s *Store[V]) Load(epoch iotago.EpochIndex) (V, error) {
}

func (s *Store[V]) Store(epoch iotago.EpochIndex, value V) error {
_, _ = s.lastAccessedEpoch.Compute(func(lastAccessedEpoch iotago.EpochIndex, exists bool) (newValue iotago.EpochIndex, err error) {
if lastAccessedEpoch >= epoch {
return lastAccessedEpoch, kvstore.ErrTypedValueNotChanged
}

return epoch, nil
})

if s.isTooOld(epoch) {
return ierrors.Wrapf(database.ErrEpochPruned, "epoch %d is too old", epoch)
}
Expand Down Expand Up @@ -133,3 +149,24 @@ func (s *Store[V]) Prune(epoch iotago.EpochIndex, defaultPruningDelay iotago.Epo

return nil
}

func (s *Store[V]) RollbackEpochs(epoch iotago.EpochIndex) (lastPrunedEpoch iotago.EpochIndex, err error) {
lastAccessedEpoch, err := s.LastAccessedEpoch()
if err != nil {
return lastAccessedEpoch, ierrors.Wrap(err, "failed to get last accessed epoch")
}

for epochToPrune := epoch; epochToPrune <= lastAccessedEpoch; epochToPrune++ {
if err = s.DeleteEpoch(epochToPrune); err != nil {
return epochToPrune, ierrors.Wrapf(err, "error while deleting epoch %d", epochToPrune)
}
}

return lastAccessedEpoch, nil
}

func (s *Store[V]) isTooOld(epoch iotago.EpochIndex) bool {
prunedEpoch, hasPruned := s.lastPrunedEpoch.Index()

return hasPruned && epoch <= prunedEpoch
}
Loading

0 comments on commit 097daf3

Please sign in to comment.