Skip to content

Commit

Permalink
Merge pull request #334 from iotaledger/feat/copy-forked-storage
Browse files Browse the repository at this point in the history
Implement logic to clone and rollback storage for engine switching.
  • Loading branch information
karimodm authored Sep 20, 2023
2 parents 8241799 + c237d8c commit 80d6b07
Show file tree
Hide file tree
Showing 34 changed files with 1,798 additions and 87 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ require (
github.com/multiformats/go-multistream v0.4.1 // indirect
github.com/onsi/ginkgo/v2 v2.12.0 // indirect
github.com/opencontainers/runtime-spec v1.1.0 // indirect
github.com/otiai10/copy v1.12.0 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pasztorpisti/qs v0.0.0-20171216220353-8d6c33ee906c // indirect
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,8 @@ github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYr
github.com/openzipkin/zipkin-go v0.1.1/go.mod h1:NtoC/o8u3JlF1lSlyPNswIbeQH9bJTmOf0Erfk+hxe8=
github.com/orcaman/writerseeker v0.0.0-20200621085525-1d3f536ff85e h1:s2RNOM/IGdY0Y6qfTeUKhDawdHDpK9RGBdx80qN4Ttw=
github.com/orcaman/writerseeker v0.0.0-20200621085525-1d3f536ff85e/go.mod h1:nBdnFKj15wFbf94Rwfq4m30eAcyY9V/IyKAGQFtqkW0=
github.com/otiai10/copy v1.12.0 h1:cLMgSQnXBs1eehF0Wy/FAGsgDTDmAqFR7rQylBb1nDY=
github.com/otiai10/copy v1.12.0/go.mod h1:rSaLseMUsZFFbsFGc7wCJnnkTAvdc5L6VWxPE4308Ww=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pasztorpisti/qs v0.0.0-20171216220353-8d6c33ee906c h1:Gcce/r5tSQeprxswXXOwQ/RBU1bjQWVd9dB7QKoPXBE=
Expand Down
2 changes: 1 addition & 1 deletion pkg/protocol/block_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (b *BlockDispatcher) Dispatch(block *model.Block, src peer.ID) error {
func (b *BlockDispatcher) initEngineMonitoring() {
b.monitorLatestEngineCommitment(b.protocol.MainEngineInstance())

b.protocol.engineManager.OnEngineCreated(b.monitorLatestEngineCommitment)
b.protocol.EngineManager.OnEngineCreated(b.monitorLatestEngineCommitment)

b.protocol.Events.ChainManager.CommitmentPublished.Hook(func(chainCommitment *chainmanager.ChainCommitment) {
// as soon as a commitment is solid, it's chain is known and it can be dispatched
Expand Down
43 changes: 43 additions & 0 deletions pkg/protocol/engine/accounts/accountsledger/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/iotaledger/hive.go/ds/shrinkingmap"
"github.com/iotaledger/hive.go/ierrors"
"github.com/iotaledger/hive.go/kvstore"
"github.com/iotaledger/hive.go/lo"
"github.com/iotaledger/hive.go/runtime/module"
"github.com/iotaledger/hive.go/runtime/options"
"github.com/iotaledger/hive.go/runtime/syncutils"
Expand Down Expand Up @@ -271,6 +272,48 @@ func (m *Manager) PastAccounts(accountIDs iotago.AccountIDs, targetIndex iotago.
return result, nil
}

func (m *Manager) Rollback(targetIndex iotago.SlotIndex) error {
for index := m.latestCommittedSlot; index > targetIndex; index-- {
slotDiff := lo.PanicOnErr(m.slotDiff(index))
var internalErr error

if err := slotDiff.Stream(func(accountID iotago.AccountID, accountDiff *model.AccountDiff, destroyed bool) bool {
accountData, exists, err := m.accountsTree.Get(accountID)
if err != nil {
internalErr = ierrors.Wrapf(err, "unable to retrieve account %s to rollback in slot %d", accountID, index)

return false
}

if !exists {
accountData = accounts.NewAccountData(accountID)
}

if _, err := m.rollbackAccountTo(accountData, targetIndex); err != nil {
internalErr = ierrors.Wrapf(err, "unable to rollback account %s to target slot index %d", accountID, targetIndex)

return false
}

if err := m.accountsTree.Set(accountID, accountData); err != nil {
internalErr = ierrors.Wrapf(err, "failed to save rolled back account %s to target slot index %d", accountID, targetIndex)

return false
}

return true
}); err != nil {
return ierrors.Wrapf(err, "error in streaming account diffs for slot %s", index)
}

if internalErr != nil {
return ierrors.Wrapf(internalErr, "error in rolling back account for slot %s", index)
}
}

return nil
}

// AddAccount adds a new account to the Account tree, allotting to it the balance on the given output.
// The Account will be created associating the given output as the latest state of the account.
func (m *Manager) AddAccount(output *utxoledger.Output, blockIssuanceCredits iotago.BlockIssuanceCredits) error {
Expand Down
1 change: 1 addition & 0 deletions pkg/protocol/engine/accounts/accountsledger/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ func (m *Manager) writeSlotDiffs(pWriter *utils.PositionedWriter, targetIndex io
// write slot diffs until being able to reach targetIndex, where the exported tree is at
slotIndex := iotago.SlotIndex(1)
maxCommittableAge := m.apiProvider.APIForSlot(targetIndex).ProtocolParameters().MaxCommittableAge()

if targetIndex > maxCommittableAge {
slotIndex = targetIndex - maxCommittableAge
}
Expand Down
1 change: 1 addition & 0 deletions pkg/protocol/engine/attestation/attestations.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Attestations interface {

Import(reader io.ReadSeeker) (err error)
Export(writer io.WriteSeeker, targetSlot iotago.SlotIndex) (err error)
Rollback(index iotago.SlotIndex) (err error)

RestoreFromDisk() (err error)

Expand Down
34 changes: 34 additions & 0 deletions pkg/protocol/engine/attestation/slotattestation/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,40 @@ func (m *Manager) Commit(index iotago.SlotIndex) (newCW uint64, attestationsRoot
return m.lastCumulativeWeight, iotago.Identifier(tree.Root()), nil
}

// Rollback rolls back the component state as if the last committed slot was targetSlot.
// It populates pendingAttestation store with previously committed attestations in order to create correct commitment in the future.
// As it modifies in-memory storage, it should only be called on the target engine as calling it on a temporary component will have no effect.
func (m *Manager) Rollback(targetSlot iotago.SlotIndex) error {
m.commitmentMutex.RLock()
defer m.commitmentMutex.RUnlock()

if targetSlot > m.lastCommittedSlot {
return ierrors.Errorf("slot %d is newer than last committed slot %d", targetSlot, m.lastCommittedSlot)
}
attestationSlotIndex, isValid := m.computeAttestationCommitmentOffset(targetSlot)
if !isValid {
return nil
}

// We only need to export the committed attestations at targetSlot as these contain all the attestations for the
// slots of targetSlot - attestationCommitmentOffset to targetSlot. This is sufficient to reconstruct the pending attestations
// for targetSlot+1.
attestationsStorage, err := m.attestationsForSlot(targetSlot)
if err != nil {
return ierrors.Wrapf(err, "failed to get attestations of slot %d", targetSlot)
}

if err = attestationsStorage.Stream(func(key iotago.AccountID, value *iotago.Attestation) error {
m.applyToPendingAttestations(value, attestationSlotIndex)

return nil
}); err != nil {
return ierrors.Wrapf(err, "failed to stream attestations of slot %d", targetSlot)
}

return nil
}

func (m *Manager) computeAttestationCommitmentOffset(slot iotago.SlotIndex) (cutoffIndex iotago.SlotIndex, isValid bool) {
if slot < m.apiProvider.APIForSlot(slot).ProtocolParameters().MaxCommittableAge() {
return 0, false
Expand Down
29 changes: 29 additions & 0 deletions pkg/protocol/engine/eviction/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,35 @@ func (s *State) Import(reader io.ReadSeeker) error {
return nil
}

func (s *State) Rollback(lowerTarget, targetIndex iotago.SlotIndex) error {
s.evictionMutex.RLock()
defer s.evictionMutex.RUnlock()

start, _ := s.delayedBlockEvictionThreshold(lowerTarget)
latestNonEmptySlot := iotago.SlotIndex(0)

for currentSlot := start; currentSlot <= targetIndex; currentSlot++ {
_, err := s.rootBlockStorageFunc(currentSlot)
if err != nil {
continue
}

latestNonEmptySlot = currentSlot
}

if latestNonEmptySlot > s.optsRootBlocksEvictionDelay {
latestNonEmptySlot -= s.optsRootBlocksEvictionDelay
} else {
latestNonEmptySlot = 0
}

if err := s.latestNonEmptyStore.Set([]byte{latestNonEmptySlotKey}, latestNonEmptySlot.MustBytes()); err != nil {
return ierrors.Wrap(err, "failed to store latest non empty slot")
}

return nil
}

// PopulateFromStorage populates the root blocks from the storage.
func (s *State) PopulateFromStorage(latestCommitmentIndex iotago.SlotIndex) {
for index := lo.Return1(s.delayedBlockEvictionThreshold(latestCommitmentIndex)); index <= latestCommitmentIndex; index++ {
Expand Down
36 changes: 32 additions & 4 deletions pkg/protocol/engine/utxoledger/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,13 +255,13 @@ func (m *Manager) Export(writer io.WriteSeeker, targetIndex iotago.SlotIndex) er
// Get all UTXOs and sort them by outputID
outputIDs, err := m.UnspentOutputsIDs(ReadLockLedger(false))
if err != nil {
return err
return ierrors.Wrap(err, "error while retrieving unspent outputIDs")
}

for _, outputID := range outputIDs.RemoveDupsAndSort() {
output, err := m.ReadOutputByOutputIDWithoutLocking(outputID)
if err != nil {
return err
return ierrors.Wrapf(err, "error while retrieving output %s", outputID)
}

if err := utils.WriteBytesFunc(writer, output.SnapshotBytes(), &relativeCountersPosition); err != nil {
Expand All @@ -274,12 +274,12 @@ func (m *Manager) Export(writer io.WriteSeeker, targetIndex iotago.SlotIndex) er
for diffIndex := ledgerIndex; diffIndex > targetIndex; diffIndex-- {
slotDiff, err := m.SlotDiffWithoutLocking(diffIndex)
if err != nil {
return err
return ierrors.Wrapf(err, "error while retrieving slot diffs for slot %s", diffIndex)
}

written, err := WriteSlotDiffToSnapshotWriter(writer, slotDiff)
if err != nil {
return err
return ierrors.Wrapf(err, "error while writing slot diffs for slot %s", diffIndex)
}

relativeCountersPosition += written
Expand Down Expand Up @@ -312,3 +312,31 @@ func (m *Manager) Export(writer io.WriteSeeker, targetIndex iotago.SlotIndex) er

return nil
}

// Rollback rolls back ledger state to the given target slot.
func (m *Manager) Rollback(targetSlot iotago.SlotIndex) error {
m.WriteLockLedger()
defer m.WriteUnlockLedger()

ledgerIndex, err := m.ReadLedgerIndexWithoutLocking()
if err != nil {
return err
}

for diffIndex := ledgerIndex; diffIndex > targetSlot; diffIndex-- {
slotDiff, err := m.SlotDiffWithoutLocking(diffIndex)
if err != nil {
return err
}

if err := m.RollbackDiffWithoutLocking(slotDiff.Index, slotDiff.Outputs, slotDiff.Spents); err != nil {
return err
}
}

if err := m.stateTree.Commit(); err != nil {
return ierrors.Wrap(err, "unable to commit state tree")
}

return nil
}
Loading

0 comments on commit 80d6b07

Please sign in to comment.