Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement logic to clone and rollback storage for engine switching. #334

Merged
merged 20 commits into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
7df3ffc
Implement logic to clone and rollback storage for engine switching.
piotrm50 Sep 6, 2023
d33e959
Implement unit test for cloning storage.
piotrm50 Sep 6, 2023
7d626c3
Debugging attestation divergence after fork WIP
piotrm50 Sep 8, 2023
23a831e
Fix the engine switching test.
piotrm50 Sep 8, 2023
735a768
Cleanup code
piotrm50 Sep 8, 2023
8766ee8
Start working on locking and openable KVStore
piotrm50 Sep 11, 2023
dca3d21
Merge remote-tracking branch 'origin/develop' into feat/copy-forked-s…
piotrm50 Sep 11, 2023
fcffbeb
Merge remote-tracking branch 'origin/develop' into feat/copy-forked-s…
piotrm50 Sep 13, 2023
71306db
Improve error message
piotrm50 Sep 13, 2023
35bab8f
Implement syncedKVstore that allows re-opening a kvstore without affe…
piotrm50 Sep 13, 2023
a47e754
Implement openableKVStore to enable using healthtracker while holding…
piotrm50 Sep 13, 2023
dab5eea
Merge remote-tracking branch 'origin/develop' into feat/copy-forked-s…
piotrm50 Sep 13, 2023
964caac
Improve storage copy tests.
piotrm50 Sep 14, 2023
258b217
Improve comments.
piotrm50 Sep 14, 2023
9d3b7c8
Implement unit tests for creating a rolledback forked engine
piotrm50 Sep 14, 2023
ca2f8f4
Even mocked PoA needs to be imported to obtain pools' data
karimodm Sep 19, 2023
3a3b48e
Address some review comments
karimodm Sep 19, 2023
a4ca65d
Address all the remaining review comments
karimodm Sep 19, 2023
fd800e3
Unused dbVersion in base Storage constructor
karimodm Sep 19, 2023
c237d8c
fumpt
karimodm Sep 19, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ require (
github.com/multiformats/go-multistream v0.4.1 // indirect
github.com/onsi/ginkgo/v2 v2.12.0 // indirect
github.com/opencontainers/runtime-spec v1.1.0 // indirect
github.com/otiai10/copy v1.12.0 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pasztorpisti/qs v0.0.0-20171216220353-8d6c33ee906c // indirect
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,8 @@ github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYr
github.com/openzipkin/zipkin-go v0.1.1/go.mod h1:NtoC/o8u3JlF1lSlyPNswIbeQH9bJTmOf0Erfk+hxe8=
github.com/orcaman/writerseeker v0.0.0-20200621085525-1d3f536ff85e h1:s2RNOM/IGdY0Y6qfTeUKhDawdHDpK9RGBdx80qN4Ttw=
github.com/orcaman/writerseeker v0.0.0-20200621085525-1d3f536ff85e/go.mod h1:nBdnFKj15wFbf94Rwfq4m30eAcyY9V/IyKAGQFtqkW0=
github.com/otiai10/copy v1.12.0 h1:cLMgSQnXBs1eehF0Wy/FAGsgDTDmAqFR7rQylBb1nDY=
github.com/otiai10/copy v1.12.0/go.mod h1:rSaLseMUsZFFbsFGc7wCJnnkTAvdc5L6VWxPE4308Ww=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pasztorpisti/qs v0.0.0-20171216220353-8d6c33ee906c h1:Gcce/r5tSQeprxswXXOwQ/RBU1bjQWVd9dB7QKoPXBE=
Expand Down
2 changes: 1 addition & 1 deletion pkg/protocol/block_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (b *BlockDispatcher) Dispatch(block *model.Block, src peer.ID) error {
func (b *BlockDispatcher) initEngineMonitoring() {
b.monitorLatestEngineCommitment(b.protocol.MainEngineInstance())

b.protocol.engineManager.OnEngineCreated(b.monitorLatestEngineCommitment)
b.protocol.EngineManager.OnEngineCreated(b.monitorLatestEngineCommitment)

b.protocol.Events.ChainManager.CommitmentPublished.Hook(func(chainCommitment *chainmanager.ChainCommitment) {
// as soon as a commitment is solid, it's chain is known and it can be dispatched
Expand Down
43 changes: 43 additions & 0 deletions pkg/protocol/engine/accounts/accountsledger/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/iotaledger/hive.go/ds/shrinkingmap"
"github.com/iotaledger/hive.go/ierrors"
"github.com/iotaledger/hive.go/kvstore"
"github.com/iotaledger/hive.go/lo"
"github.com/iotaledger/hive.go/runtime/module"
"github.com/iotaledger/hive.go/runtime/options"
"github.com/iotaledger/hive.go/runtime/syncutils"
Expand Down Expand Up @@ -271,6 +272,48 @@ func (m *Manager) PastAccounts(accountIDs iotago.AccountIDs, targetIndex iotago.
return result, nil
}

func (m *Manager) Rollback(targetIndex iotago.SlotIndex) error {
for index := m.latestCommittedSlot; index > targetIndex; index-- {
slotDiff := lo.PanicOnErr(m.slotDiff(index))
var internalErr error

if err := slotDiff.Stream(func(accountID iotago.AccountID, accountDiff *model.AccountDiff, destroyed bool) bool {
accountData, exists, err := m.accountsTree.Get(accountID)
if err != nil {
internalErr = ierrors.Wrapf(err, "unable to retrieve account %s to rollback in slot %d", accountID, index)

return false
}

if !exists {
accountData = accounts.NewAccountData(accountID)
}

if _, err := m.rollbackAccountTo(accountData, targetIndex); err != nil {
internalErr = ierrors.Wrapf(err, "unable to rollback account %s to target slot index %d", accountID, targetIndex)

return false
}

if err := m.accountsTree.Set(accountID, accountData); err != nil {
internalErr = ierrors.Wrapf(err, "failed to save rolled back account %s to target slot index %d", accountID, targetIndex)

return false
}

return true
}); err != nil {
return ierrors.Wrapf(err, "error in streaming account diffs for slot %s", index)
}

if internalErr != nil {
return ierrors.Wrapf(internalErr, "error in rolling back account for slot %s", index)
}
}

return nil
}

// AddAccount adds a new account to the Account tree, allotting to it the balance on the given output.
// The Account will be created associating the given output as the latest state of the account.
func (m *Manager) AddAccount(output *utxoledger.Output, blockIssuanceCredits iotago.BlockIssuanceCredits) error {
Expand Down
2 changes: 2 additions & 0 deletions pkg/protocol/engine/accounts/accountsledger/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,8 @@ func (m *Manager) readSlotDiffs(reader io.ReadSeeker, slotDiffCount uint64) erro
func (m *Manager) writeSlotDiffs(pWriter *utils.PositionedWriter, targetIndex iotago.SlotIndex) (slotDiffsCount uint64, err error) {
// write slot diffs until being able to reach targetIndex, where the exported tree is at
slotIndex := iotago.SlotIndex(1)

// TODO: shouldn't that be from last finalized slot?
karimodm marked this conversation as resolved.
Show resolved Hide resolved
karimodm marked this conversation as resolved.
Show resolved Hide resolved
maxCommittableAge := m.apiProvider.APIForSlot(targetIndex).ProtocolParameters().MaxCommittableAge()
if targetIndex > maxCommittableAge {
slotIndex = targetIndex - maxCommittableAge
Expand Down
1 change: 1 addition & 0 deletions pkg/protocol/engine/attestation/attestations.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Attestations interface {

Import(reader io.ReadSeeker) (err error)
Export(writer io.WriteSeeker, targetSlot iotago.SlotIndex) (err error)
Rollback(index iotago.SlotIndex) (err error)

RestoreFromDisk() (err error)

Expand Down
34 changes: 34 additions & 0 deletions pkg/protocol/engine/attestation/slotattestation/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,40 @@ func (m *Manager) Commit(index iotago.SlotIndex) (newCW uint64, attestationsRoot
return m.lastCumulativeWeight, iotago.Identifier(tree.Root()), nil
}

// Rollback rolls back the component state as if the last committed slot was targetSlot.
// It populates pendingAttestation store with previously committed attestations in order to create correct commitment in the future.
// As it modifies in-memory storage, it should only be called on the target engine as calling it on a temporary component will have no effect.
func (m *Manager) Rollback(targetSlot iotago.SlotIndex) error {
m.commitmentMutex.RLock()
defer m.commitmentMutex.RUnlock()

if targetSlot > m.lastCommittedSlot {
return ierrors.Errorf("slot %d is newer than last committed slot %d", targetSlot, m.lastCommittedSlot)
}
attestationSlotIndex, isValid := m.computeAttestationCommitmentOffset(targetSlot)
if !isValid {
return nil
}

// We only need to export the committed attestations at targetSlot as these contain all the attestations for the
// slots of targetSlot - attestationCommitmentOffset to targetSlot. This is sufficient to reconstruct the pending attestations
// for targetSlot+1.
attestationsStorage, err := m.attestationsForSlot(targetSlot)
if err != nil {
return ierrors.Wrapf(err, "failed to get attestations of slot %d", targetSlot)
}

if err = attestationsStorage.Stream(func(key iotago.AccountID, value *iotago.Attestation) error {
m.applyToPendingAttestations(value, attestationSlotIndex)

return nil
}); err != nil {
return ierrors.Wrapf(err, "failed to stream attestations of slot %d", targetSlot)
}

return nil
}

func (m *Manager) computeAttestationCommitmentOffset(slot iotago.SlotIndex) (cutoffIndex iotago.SlotIndex, isValid bool) {
if slot < m.apiProvider.APIForSlot(slot).ProtocolParameters().MaxCommittableAge() {
return 0, false
Expand Down
29 changes: 29 additions & 0 deletions pkg/protocol/engine/eviction/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,35 @@ func (s *State) Import(reader io.ReadSeeker) error {
return nil
}

func (s *State) Rollback(lowerTarget, targetIndex iotago.SlotIndex) error {
s.evictionMutex.RLock()
defer s.evictionMutex.RUnlock()

start, _ := s.delayedBlockEvictionThreshold(lowerTarget)
latestNonEmptySlot := iotago.SlotIndex(0)

for currentSlot := start; currentSlot <= targetIndex; currentSlot++ {
_, err := s.rootBlockStorageFunc(currentSlot)
if err != nil {
continue
}

latestNonEmptySlot = currentSlot
}

if latestNonEmptySlot > s.optsRootBlocksEvictionDelay {
latestNonEmptySlot -= s.optsRootBlocksEvictionDelay
} else {
latestNonEmptySlot = 0
}

if err := s.latestNonEmptyStore.Set([]byte{latestNonEmptySlotKey}, latestNonEmptySlot.MustBytes()); err != nil {
return ierrors.Wrap(err, "failed to store latest non empty slot")
}

return nil
}

// PopulateFromStorage populates the root blocks from the storage.
func (s *State) PopulateFromStorage(latestCommitmentIndex iotago.SlotIndex) {
for index := lo.Return1(s.delayedBlockEvictionThreshold(latestCommitmentIndex)); index <= latestCommitmentIndex; index++ {
Expand Down
36 changes: 32 additions & 4 deletions pkg/protocol/engine/utxoledger/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,13 +255,13 @@ func (m *Manager) Export(writer io.WriteSeeker, targetIndex iotago.SlotIndex) er
// Get all UTXOs and sort them by outputID
outputIDs, err := m.UnspentOutputsIDs(ReadLockLedger(false))
if err != nil {
return err
return ierrors.Wrap(err, "error while retrieving unspent outputIDs")
}

for _, outputID := range outputIDs.RemoveDupsAndSort() {
output, err := m.ReadOutputByOutputIDWithoutLocking(outputID)
if err != nil {
return err
return ierrors.Wrapf(err, "error while retrieving output %s", outputID)
}

if err := utils.WriteBytesFunc(writer, output.SnapshotBytes(), &relativeCountersPosition); err != nil {
Expand All @@ -274,12 +274,12 @@ func (m *Manager) Export(writer io.WriteSeeker, targetIndex iotago.SlotIndex) er
for diffIndex := ledgerIndex; diffIndex > targetIndex; diffIndex-- {
slotDiff, err := m.SlotDiffWithoutLocking(diffIndex)
if err != nil {
return err
return ierrors.Wrapf(err, "error while retrieving slot diffs for slot %s", diffIndex)
}

written, err := WriteSlotDiffToSnapshotWriter(writer, slotDiff)
if err != nil {
return err
return ierrors.Wrapf(err, "error while writing slot diffs for slot %s", diffIndex)
}

relativeCountersPosition += written
Expand Down Expand Up @@ -312,3 +312,31 @@ func (m *Manager) Export(writer io.WriteSeeker, targetIndex iotago.SlotIndex) er

return nil
}

// Rollback rolls back ledger state to the given target slot.
func (m *Manager) Rollback(targetSlot iotago.SlotIndex) error {
m.WriteLockLedger()
defer m.WriteUnlockLedger()

ledgerIndex, err := m.ReadLedgerIndexWithoutLocking()
if err != nil {
return err
}

for diffIndex := ledgerIndex; diffIndex > targetSlot; diffIndex-- {
slotDiff, err := m.SlotDiffWithoutLocking(diffIndex)
if err != nil {
return err
}

if err := m.RollbackDiffWithoutLocking(slotDiff.Index, slotDiff.Outputs, slotDiff.Spents); err != nil {
return err
}
}

if err := m.stateTree.Commit(); err != nil {
return ierrors.Wrap(err, "unable to commit state tree")
}

return nil
}
Loading
Loading