diff --git a/go.mod b/go.mod index 7b6135a3d..d65f1919b 100644 --- a/go.mod +++ b/go.mod @@ -134,6 +134,7 @@ require ( github.com/multiformats/go-multistream v0.4.1 // indirect github.com/onsi/ginkgo/v2 v2.12.0 // indirect github.com/opencontainers/runtime-spec v1.1.0 // indirect + github.com/otiai10/copy v1.12.0 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/pasztorpisti/qs v0.0.0-20171216220353-8d6c33ee906c // indirect github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect diff --git a/go.sum b/go.sum index d2bb86a59..dbd5e19bf 100644 --- a/go.sum +++ b/go.sum @@ -509,6 +509,8 @@ github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYr github.com/openzipkin/zipkin-go v0.1.1/go.mod h1:NtoC/o8u3JlF1lSlyPNswIbeQH9bJTmOf0Erfk+hxe8= github.com/orcaman/writerseeker v0.0.0-20200621085525-1d3f536ff85e h1:s2RNOM/IGdY0Y6qfTeUKhDawdHDpK9RGBdx80qN4Ttw= github.com/orcaman/writerseeker v0.0.0-20200621085525-1d3f536ff85e/go.mod h1:nBdnFKj15wFbf94Rwfq4m30eAcyY9V/IyKAGQFtqkW0= +github.com/otiai10/copy v1.12.0 h1:cLMgSQnXBs1eehF0Wy/FAGsgDTDmAqFR7rQylBb1nDY= +github.com/otiai10/copy v1.12.0/go.mod h1:rSaLseMUsZFFbsFGc7wCJnnkTAvdc5L6VWxPE4308Ww= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pasztorpisti/qs v0.0.0-20171216220353-8d6c33ee906c h1:Gcce/r5tSQeprxswXXOwQ/RBU1bjQWVd9dB7QKoPXBE= diff --git a/pkg/protocol/block_dispatcher.go b/pkg/protocol/block_dispatcher.go index 74362e6f7..92c18337d 100644 --- a/pkg/protocol/block_dispatcher.go +++ b/pkg/protocol/block_dispatcher.go @@ -101,7 +101,7 @@ func (b *BlockDispatcher) Dispatch(block *model.Block, src peer.ID) error { func (b *BlockDispatcher) initEngineMonitoring() { b.monitorLatestEngineCommitment(b.protocol.MainEngineInstance()) - b.protocol.engineManager.OnEngineCreated(b.monitorLatestEngineCommitment) + b.protocol.EngineManager.OnEngineCreated(b.monitorLatestEngineCommitment) b.protocol.Events.ChainManager.CommitmentPublished.Hook(func(chainCommitment *chainmanager.ChainCommitment) { // as soon as a commitment is solid, it's chain is known and it can be dispatched diff --git a/pkg/protocol/engine/accounts/accountsledger/manager.go b/pkg/protocol/engine/accounts/accountsledger/manager.go index 5bf108a72..45590c3ff 100644 --- a/pkg/protocol/engine/accounts/accountsledger/manager.go +++ b/pkg/protocol/engine/accounts/accountsledger/manager.go @@ -8,6 +8,7 @@ import ( "github.com/iotaledger/hive.go/ds/shrinkingmap" "github.com/iotaledger/hive.go/ierrors" "github.com/iotaledger/hive.go/kvstore" + "github.com/iotaledger/hive.go/lo" "github.com/iotaledger/hive.go/runtime/module" "github.com/iotaledger/hive.go/runtime/options" "github.com/iotaledger/hive.go/runtime/syncutils" @@ -271,6 +272,48 @@ func (m *Manager) PastAccounts(accountIDs iotago.AccountIDs, targetIndex iotago. return result, nil } +func (m *Manager) Rollback(targetIndex iotago.SlotIndex) error { + for index := m.latestCommittedSlot; index > targetIndex; index-- { + slotDiff := lo.PanicOnErr(m.slotDiff(index)) + var internalErr error + + if err := slotDiff.Stream(func(accountID iotago.AccountID, accountDiff *model.AccountDiff, destroyed bool) bool { + accountData, exists, err := m.accountsTree.Get(accountID) + if err != nil { + internalErr = ierrors.Wrapf(err, "unable to retrieve account %s to rollback in slot %d", accountID, index) + + return false + } + + if !exists { + accountData = accounts.NewAccountData(accountID) + } + + if _, err := m.rollbackAccountTo(accountData, targetIndex); err != nil { + internalErr = ierrors.Wrapf(err, "unable to rollback account %s to target slot index %d", accountID, targetIndex) + + return false + } + + if err := m.accountsTree.Set(accountID, accountData); err != nil { + internalErr = ierrors.Wrapf(err, "failed to save rolled back account %s to target slot index %d", accountID, targetIndex) + + return false + } + + return true + }); err != nil { + return ierrors.Wrapf(err, "error in streaming account diffs for slot %s", index) + } + + if internalErr != nil { + return ierrors.Wrapf(internalErr, "error in rolling back account for slot %s", index) + } + } + + return nil +} + // AddAccount adds a new account to the Account tree, allotting to it the balance on the given output. // The Account will be created associating the given output as the latest state of the account. func (m *Manager) AddAccount(output *utxoledger.Output, blockIssuanceCredits iotago.BlockIssuanceCredits) error { diff --git a/pkg/protocol/engine/accounts/accountsledger/snapshot.go b/pkg/protocol/engine/accounts/accountsledger/snapshot.go index f54122791..fe3063e49 100644 --- a/pkg/protocol/engine/accounts/accountsledger/snapshot.go +++ b/pkg/protocol/engine/accounts/accountsledger/snapshot.go @@ -216,6 +216,7 @@ func (m *Manager) writeSlotDiffs(pWriter *utils.PositionedWriter, targetIndex io // write slot diffs until being able to reach targetIndex, where the exported tree is at slotIndex := iotago.SlotIndex(1) maxCommittableAge := m.apiProvider.APIForSlot(targetIndex).ProtocolParameters().MaxCommittableAge() + if targetIndex > maxCommittableAge { slotIndex = targetIndex - maxCommittableAge } diff --git a/pkg/protocol/engine/attestation/attestations.go b/pkg/protocol/engine/attestation/attestations.go index 74bd0dc94..f860b95d9 100644 --- a/pkg/protocol/engine/attestation/attestations.go +++ b/pkg/protocol/engine/attestation/attestations.go @@ -22,6 +22,7 @@ type Attestations interface { Import(reader io.ReadSeeker) (err error) Export(writer io.WriteSeeker, targetSlot iotago.SlotIndex) (err error) + Rollback(index iotago.SlotIndex) (err error) RestoreFromDisk() (err error) diff --git a/pkg/protocol/engine/attestation/slotattestation/manager.go b/pkg/protocol/engine/attestation/slotattestation/manager.go index 3779e7753..f72592eed 100644 --- a/pkg/protocol/engine/attestation/slotattestation/manager.go +++ b/pkg/protocol/engine/attestation/slotattestation/manager.go @@ -275,6 +275,40 @@ func (m *Manager) Commit(index iotago.SlotIndex) (newCW uint64, attestationsRoot return m.lastCumulativeWeight, iotago.Identifier(tree.Root()), nil } +// Rollback rolls back the component state as if the last committed slot was targetSlot. +// It populates pendingAttestation store with previously committed attestations in order to create correct commitment in the future. +// As it modifies in-memory storage, it should only be called on the target engine as calling it on a temporary component will have no effect. +func (m *Manager) Rollback(targetSlot iotago.SlotIndex) error { + m.commitmentMutex.RLock() + defer m.commitmentMutex.RUnlock() + + if targetSlot > m.lastCommittedSlot { + return ierrors.Errorf("slot %d is newer than last committed slot %d", targetSlot, m.lastCommittedSlot) + } + attestationSlotIndex, isValid := m.computeAttestationCommitmentOffset(targetSlot) + if !isValid { + return nil + } + + // We only need to export the committed attestations at targetSlot as these contain all the attestations for the + // slots of targetSlot - attestationCommitmentOffset to targetSlot. This is sufficient to reconstruct the pending attestations + // for targetSlot+1. + attestationsStorage, err := m.attestationsForSlot(targetSlot) + if err != nil { + return ierrors.Wrapf(err, "failed to get attestations of slot %d", targetSlot) + } + + if err = attestationsStorage.Stream(func(key iotago.AccountID, value *iotago.Attestation) error { + m.applyToPendingAttestations(value, attestationSlotIndex) + + return nil + }); err != nil { + return ierrors.Wrapf(err, "failed to stream attestations of slot %d", targetSlot) + } + + return nil +} + func (m *Manager) computeAttestationCommitmentOffset(slot iotago.SlotIndex) (cutoffIndex iotago.SlotIndex, isValid bool) { if slot < m.apiProvider.APIForSlot(slot).ProtocolParameters().MaxCommittableAge() { return 0, false diff --git a/pkg/protocol/engine/eviction/state.go b/pkg/protocol/engine/eviction/state.go index 0e55d69a2..c5ba5ef25 100644 --- a/pkg/protocol/engine/eviction/state.go +++ b/pkg/protocol/engine/eviction/state.go @@ -278,6 +278,35 @@ func (s *State) Import(reader io.ReadSeeker) error { return nil } +func (s *State) Rollback(lowerTarget, targetIndex iotago.SlotIndex) error { + s.evictionMutex.RLock() + defer s.evictionMutex.RUnlock() + + start, _ := s.delayedBlockEvictionThreshold(lowerTarget) + latestNonEmptySlot := iotago.SlotIndex(0) + + for currentSlot := start; currentSlot <= targetIndex; currentSlot++ { + _, err := s.rootBlockStorageFunc(currentSlot) + if err != nil { + continue + } + + latestNonEmptySlot = currentSlot + } + + if latestNonEmptySlot > s.optsRootBlocksEvictionDelay { + latestNonEmptySlot -= s.optsRootBlocksEvictionDelay + } else { + latestNonEmptySlot = 0 + } + + if err := s.latestNonEmptyStore.Set([]byte{latestNonEmptySlotKey}, latestNonEmptySlot.MustBytes()); err != nil { + return ierrors.Wrap(err, "failed to store latest non empty slot") + } + + return nil +} + // PopulateFromStorage populates the root blocks from the storage. func (s *State) PopulateFromStorage(latestCommitmentIndex iotago.SlotIndex) { for index := lo.Return1(s.delayedBlockEvictionThreshold(latestCommitmentIndex)); index <= latestCommitmentIndex; index++ { diff --git a/pkg/protocol/engine/utxoledger/snapshot.go b/pkg/protocol/engine/utxoledger/snapshot.go index dbd68d7a1..02e0ab24f 100644 --- a/pkg/protocol/engine/utxoledger/snapshot.go +++ b/pkg/protocol/engine/utxoledger/snapshot.go @@ -255,13 +255,13 @@ func (m *Manager) Export(writer io.WriteSeeker, targetIndex iotago.SlotIndex) er // Get all UTXOs and sort them by outputID outputIDs, err := m.UnspentOutputsIDs(ReadLockLedger(false)) if err != nil { - return err + return ierrors.Wrap(err, "error while retrieving unspent outputIDs") } for _, outputID := range outputIDs.RemoveDupsAndSort() { output, err := m.ReadOutputByOutputIDWithoutLocking(outputID) if err != nil { - return err + return ierrors.Wrapf(err, "error while retrieving output %s", outputID) } if err := utils.WriteBytesFunc(writer, output.SnapshotBytes(), &relativeCountersPosition); err != nil { @@ -274,12 +274,12 @@ func (m *Manager) Export(writer io.WriteSeeker, targetIndex iotago.SlotIndex) er for diffIndex := ledgerIndex; diffIndex > targetIndex; diffIndex-- { slotDiff, err := m.SlotDiffWithoutLocking(diffIndex) if err != nil { - return err + return ierrors.Wrapf(err, "error while retrieving slot diffs for slot %s", diffIndex) } written, err := WriteSlotDiffToSnapshotWriter(writer, slotDiff) if err != nil { - return err + return ierrors.Wrapf(err, "error while writing slot diffs for slot %s", diffIndex) } relativeCountersPosition += written @@ -312,3 +312,31 @@ func (m *Manager) Export(writer io.WriteSeeker, targetIndex iotago.SlotIndex) er return nil } + +// Rollback rolls back ledger state to the given target slot. +func (m *Manager) Rollback(targetSlot iotago.SlotIndex) error { + m.WriteLockLedger() + defer m.WriteUnlockLedger() + + ledgerIndex, err := m.ReadLedgerIndexWithoutLocking() + if err != nil { + return err + } + + for diffIndex := ledgerIndex; diffIndex > targetSlot; diffIndex-- { + slotDiff, err := m.SlotDiffWithoutLocking(diffIndex) + if err != nil { + return err + } + + if err := m.RollbackDiffWithoutLocking(slotDiff.Index, slotDiff.Outputs, slotDiff.Spents); err != nil { + return err + } + } + + if err := m.stateTree.Commit(); err != nil { + return ierrors.Wrap(err, "unable to commit state tree") + } + + return nil +} diff --git a/pkg/protocol/enginemanager/enginemanager.go b/pkg/protocol/enginemanager/enginemanager.go index 97deba261..96da59c00 100644 --- a/pkg/protocol/enginemanager/enginemanager.go +++ b/pkg/protocol/enginemanager/enginemanager.go @@ -1,7 +1,6 @@ package enginemanager import ( - "fmt" "os" "path/filepath" @@ -15,14 +14,17 @@ import ( "github.com/iotaledger/hive.go/runtime/options" "github.com/iotaledger/hive.go/runtime/workerpool" "github.com/iotaledger/iota-core/pkg/protocol/engine" + "github.com/iotaledger/iota-core/pkg/protocol/engine/accounts/accountsledger" "github.com/iotaledger/iota-core/pkg/protocol/engine/attestation" "github.com/iotaledger/iota-core/pkg/protocol/engine/blockdag" + "github.com/iotaledger/iota-core/pkg/protocol/engine/blocks" "github.com/iotaledger/iota-core/pkg/protocol/engine/booker" "github.com/iotaledger/iota-core/pkg/protocol/engine/clock" "github.com/iotaledger/iota-core/pkg/protocol/engine/commitmentfilter" "github.com/iotaledger/iota-core/pkg/protocol/engine/congestioncontrol/scheduler" "github.com/iotaledger/iota-core/pkg/protocol/engine/consensus/blockgadget" "github.com/iotaledger/iota-core/pkg/protocol/engine/consensus/slotgadget" + "github.com/iotaledger/iota-core/pkg/protocol/engine/eviction" "github.com/iotaledger/iota-core/pkg/protocol/engine/filter" "github.com/iotaledger/iota-core/pkg/protocol/engine/ledger" "github.com/iotaledger/iota-core/pkg/protocol/engine/notarization" @@ -43,8 +45,6 @@ type engineInfo struct { Name string `json:"name"` } -// region EngineManager //////////////////////////////////////////////////////////////////////////////////////////////// - type EngineManager struct { directory *utils.Directory dbVersion byte @@ -138,13 +138,14 @@ func (e *EngineManager) LoadActiveEngine(snapshotPath string) (*engine.Engine, e if len(info.Name) > 0 { if exists, isDirectory, err := ioutils.PathExists(e.directory.Path(info.Name)); err == nil && exists && isDirectory { // Load previous engine as active - e.activeInstance = e.loadEngineInstance(info.Name, snapshotPath) + e.activeInstance = e.loadEngineInstanceFromSnapshot(info.Name, snapshotPath) } } if e.activeInstance == nil { // Start with a new instance and set to active - instance := e.newEngineInstance(snapshotPath) + instance := e.loadEngineInstanceFromSnapshot(lo.PanicOnErr(uuid.NewUUID()).String(), snapshotPath) + if err := e.SetActiveInstance(instance); err != nil { return nil, err } @@ -191,14 +192,24 @@ func (e *EngineManager) SetActiveInstance(instance *engine.Engine) error { return ioutils.WriteJSONToFile(e.infoFilePath(), info, 0o644) } -func (e *EngineManager) loadEngineInstance(dirName string, snapshotPath string) *engine.Engine { +func (e *EngineManager) loadEngineInstanceFromSnapshot(engineAlias string, snapshotPath string) *engine.Engine { errorHandler := func(err error) { - e.errorHandler(ierrors.Wrapf(err, "engine (%s)", dirName[0:8])) + e.errorHandler(ierrors.Wrapf(err, "engine (%s)", engineAlias[0:8])) } - newEngine := engine.New(e.workers.CreateGroup(dirName), + e.engineOptions = append(e.engineOptions, engine.WithSnapshotPath(snapshotPath)) + + return e.loadEngineInstanceWithStorage(engineAlias, storage.Create(e.directory.Path(engineAlias), e.dbVersion, errorHandler, e.storageOptions...)) +} + +func (e *EngineManager) loadEngineInstanceWithStorage(engineAlias string, storage *storage.Storage) *engine.Engine { + errorHandler := func(err error) { + e.errorHandler(ierrors.Wrapf(err, "engine (%s)", engineAlias[0:8])) + } + + newEngine := engine.New(e.workers.CreateGroup(engineAlias), errorHandler, - storage.New(e.directory.Path(dirName), e.dbVersion, errorHandler, e.storageOptions...), + storage, e.filterProvider, e.commitmentFilterProvider, e.blockDAGProvider, @@ -216,7 +227,7 @@ func (e *EngineManager) loadEngineInstance(dirName string, snapshotPath string) e.retainerProvider, e.upgradeOrchestratorProvider, e.syncManagerProvider, - append(e.engineOptions, engine.WithSnapshotPath(snapshotPath))..., + e.engineOptions..., ) e.engineCreated.Trigger(newEngine) @@ -224,23 +235,69 @@ func (e *EngineManager) loadEngineInstance(dirName string, snapshotPath string) return newEngine } -func (e *EngineManager) newEngineInstance(snapshotPath string) *engine.Engine { - dirName := lo.PanicOnErr(uuid.NewUUID()).String() - return e.loadEngineInstance(dirName, snapshotPath) -} - func (e *EngineManager) ForkEngineAtSlot(index iotago.SlotIndex) (*engine.Engine, error) { - // Dump a snapshot at the target index - snapshotPath := filepath.Join(os.TempDir(), fmt.Sprintf("snapshot_%d_%s.bin", index, lo.PanicOnErr(uuid.NewUUID()))) - if err := e.activeInstance.WriteSnapshot(snapshotPath, index); err != nil { - return nil, ierrors.Wrapf(err, "error exporting snapshot for index %s", index) + engineAlias := newEngineAlias() + errorHandler := func(err error) { + e.errorHandler(ierrors.Wrapf(err, "engine (%s)", engineAlias[0:8])) + } + + // Copy raw data on disk. + newStorage, err := storage.Clone(e.activeInstance.Storage, e.directory.Path(engineAlias), e.dbVersion, errorHandler, e.storageOptions...) + if err != nil { + return nil, ierrors.Wrapf(err, "failed to copy storage from active engine instance (%s) to new engine instance (%s)", e.activeInstance.Storage.Directory(), e.directory.Path(engineAlias)) + } + + // Remove commitments that after forking point. + latestCommitment := newStorage.Settings().LatestCommitment() + if err := newStorage.Commitments().Rollback(index, latestCommitment.Index()); err != nil { + return nil, ierrors.Wrap(err, "failed to rollback commitments") + } + // Create temporary components and rollback their permanent state, which will be reflected on disk. + evictionState := eviction.NewState(newStorage.LatestNonEmptySlot(), newStorage.RootBlocks) + evictionState.Initialize(latestCommitment.Index()) + + blockCache := blocks.New(evictionState, newStorage.Settings().APIProvider()) + accountsManager := accountsledger.New(newStorage.Settings().APIProvider(), blockCache.Block, newStorage.AccountDiffs, newStorage.Accounts()) + + accountsManager.SetLatestCommittedSlot(latestCommitment.Index()) + if err := accountsManager.Rollback(index); err != nil { + return nil, ierrors.Wrap(err, "failed to rollback accounts manager") + } + + if err := evictionState.Rollback(newStorage.Settings().LatestFinalizedSlot(), index); err != nil { + return nil, ierrors.Wrap(err, "failed to rollback eviction state") + } + if err := newStorage.Ledger().Rollback(index); err != nil { + return nil, err + } + + targetCommitment, err := newStorage.Commitments().Load(index) + if err != nil { + return nil, ierrors.Wrapf(err, "error while retrieving commitment for target index %d", index) + } + + if err := newStorage.Settings().Rollback(targetCommitment); err != nil { + return nil, err + } + + if err := newStorage.RollbackPrunable(index); err != nil { + return nil, err } - return e.newEngineInstance(snapshotPath), nil + candidateEngine := e.loadEngineInstanceWithStorage(engineAlias, newStorage) + + // Rollback attestations already on created engine instance, because this action modifies the in-memory storage. + if err := candidateEngine.Attestations.Rollback(index); err != nil { + return nil, ierrors.Wrap(err, "error while rolling back attestations storage on candidate engine") + } + + return candidateEngine, nil } func (e *EngineManager) OnEngineCreated(handler func(*engine.Engine)) (unsubscribe func()) { return e.engineCreated.Hook(handler).Unhook } -// endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// +func newEngineAlias() string { + return lo.PanicOnErr(uuid.NewUUID()).String() +} diff --git a/pkg/protocol/protocol.go b/pkg/protocol/protocol.go index 08bfa692f..f458cff09 100644 --- a/pkg/protocol/protocol.go +++ b/pkg/protocol/protocol.go @@ -59,7 +59,7 @@ type Protocol struct { context context.Context Events *Events BlockDispatcher *BlockDispatcher - engineManager *enginemanager.EngineManager + EngineManager *enginemanager.EngineManager ChainManager *chainmanager.Manager Workers *workerpool.Group @@ -196,7 +196,7 @@ func (p *Protocol) shutdown() { } func (p *Protocol) initEngineManager() { - p.engineManager = enginemanager.New( + p.EngineManager = enginemanager.New( p.Workers.CreateGroup("EngineManager"), p.HandleError, p.optsBaseDirectory, @@ -222,7 +222,7 @@ func (p *Protocol) initEngineManager() { p.optsSyncManagerProvider, ) - mainEngine, err := p.engineManager.LoadActiveEngine(p.optsSnapshotPath) + mainEngine, err := p.EngineManager.LoadActiveEngine(p.optsSnapshotPath) if err != nil { panic(fmt.Sprintf("could not load active engine: %s", err)) } diff --git a/pkg/protocol/protocol_fork.go b/pkg/protocol/protocol_fork.go index 2ee83ae43..0f37fc308 100644 --- a/pkg/protocol/protocol_fork.go +++ b/pkg/protocol/protocol_fork.go @@ -85,7 +85,7 @@ func (p *Protocol) onForkDetected(fork *chainmanager.Fork) { // 2. The candidate engine never becomes synced or its chain is not heavier than the main chain -> discard it after a timeout. // 3. The candidate engine is not creating the same commitments as the chain we decided to switch to -> discard it immediately. snapshotTargetIndex := fork.ForkingPoint.Index() - 1 - candidateEngineInstance, err := p.engineManager.ForkEngineAtSlot(snapshotTargetIndex) + candidateEngineInstance, err := p.EngineManager.ForkEngineAtSlot(snapshotTargetIndex) if err != nil { p.HandleError(ierrors.Wrap(err, "error creating new candidate engine")) return @@ -310,7 +310,7 @@ func (p *Protocol) switchEngines() { return false } - if err := p.engineManager.SetActiveInstance(candidateEngineInstance); err != nil { + if err := p.EngineManager.SetActiveInstance(candidateEngineInstance); err != nil { p.HandleError(ierrors.Wrap(err, "error switching engines")) return false @@ -331,8 +331,6 @@ func (p *Protocol) switchEngines() { if success { p.Events.MainEngineSwitched.Trigger(p.MainEngineInstance()) - // TODO: copy over old slots from the old engine to the new one - // Cleanup filesystem if err := oldEngine.RemoveFromFilesystem(); err != nil { p.HandleError(ierrors.Wrap(err, "error removing storage directory after switching engines")) diff --git a/pkg/protocol/snapshotcreator/snapshotcreator.go b/pkg/protocol/snapshotcreator/snapshotcreator.go index e12ce74cd..a14add174 100644 --- a/pkg/protocol/snapshotcreator/snapshotcreator.go +++ b/pkg/protocol/snapshotcreator/snapshotcreator.go @@ -54,7 +54,7 @@ func CreateSnapshot(opts ...options.Option[Options]) error { workers := workerpool.NewGroup("CreateSnapshot") defer workers.Shutdown() - s := storage.New(lo.PanicOnErr(os.MkdirTemp(os.TempDir(), "*")), opt.DataBaseVersion, errorHandler) + s := storage.Create(lo.PanicOnErr(os.MkdirTemp(os.TempDir(), "*")), opt.DataBaseVersion, errorHandler) defer s.Shutdown() if err := s.Settings().StoreProtocolParametersForStartEpoch(opt.ProtocolParameters, 0); err != nil { diff --git a/pkg/protocol/sybilprotection/seatmanager/mock/mockseatmanager.go b/pkg/protocol/sybilprotection/seatmanager/mock/mockseatmanager.go index ba10e8baa..223f212df 100644 --- a/pkg/protocol/sybilprotection/seatmanager/mock/mockseatmanager.go +++ b/pkg/protocol/sybilprotection/seatmanager/mock/mockseatmanager.go @@ -119,7 +119,9 @@ func (m *ManualPOA) RotateCommittee(_ iotago.EpochIndex, _ *account.Accounts) *a func (m *ManualPOA) SetCommittee(_ iotago.EpochIndex, _ *account.Accounts) { } -func (m *ManualPOA) ImportCommittee(_ iotago.EpochIndex, _ *account.Accounts) { +func (m *ManualPOA) ImportCommittee(_ iotago.EpochIndex, validators *account.Accounts) { + m.accounts = validators + m.committee = m.accounts.SelectCommittee(validators.IDs()...) } func (m *ManualPOA) Shutdown() {} diff --git a/pkg/protocol/sybilprotection/sybilprotectionv1/performance/performance.go b/pkg/protocol/sybilprotection/sybilprotectionv1/performance/performance.go index d5e5048da..3a0613676 100644 --- a/pkg/protocol/sybilprotection/sybilprotectionv1/performance/performance.go +++ b/pkg/protocol/sybilprotection/sybilprotectionv1/performance/performance.go @@ -92,7 +92,6 @@ func (t *Tracker) ApplyEpoch(epoch iotago.EpochIndex, committee *account.Account timeProvider := t.apiProvider.APIForEpoch(epoch).TimeProvider() epochStartSlot := timeProvider.EpochStart(epoch) epochEndSlot := timeProvider.EpochEnd(epoch) - profitMargin := calculateProfitMargin(committee.TotalValidatorStake(), committee.TotalStake()) poolsStats := &model.PoolsStats{ TotalStake: committee.TotalStake(), diff --git a/pkg/protocol/sybilprotection/sybilprotectionv1/sybilprotection.go b/pkg/protocol/sybilprotection/sybilprotectionv1/sybilprotection.go index 59ca80c96..fe3de89f2 100644 --- a/pkg/protocol/sybilprotection/sybilprotectionv1/sybilprotection.go +++ b/pkg/protocol/sybilprotection/sybilprotectionv1/sybilprotection.go @@ -87,8 +87,10 @@ func NewProvider(opts ...options.Option[SybilProtection]) module.Provider[*engin panic("failed to load committee for last finalized slot to initialize sybil protection") } o.seatManager.ImportCommittee(currentEpoch, committee) + fmt.Println("committee import", committee.TotalStake(), currentEpoch) if nextCommittee, nextCommitteeExists := o.performanceTracker.LoadCommitteeForEpoch(currentEpoch + 1); nextCommitteeExists { o.seatManager.ImportCommittee(currentEpoch+1, nextCommittee) + fmt.Println("next committee", nextCommittee.TotalStake(), currentEpoch+1) } o.TriggerInitialized() @@ -134,7 +136,7 @@ func (o *SybilProtection) CommitSlot(slot iotago.SlotIndex) (committeeRoot, rewa } committee.SetReused() - + fmt.Println("reuse committee", currentEpoch, "stake", committee.TotalValidatorStake()) o.seatManager.SetCommittee(nextEpoch, committee) o.events.CommitteeSelected.Trigger(committee, nextEpoch) @@ -247,6 +249,7 @@ func (o *SybilProtection) slotFinalized(slot iotago.SlotIndex) { if slot+apiForSlot.ProtocolParameters().EpochNearingThreshold() == epochEndSlot && epochEndSlot > o.lastCommittedSlot+apiForSlot.ProtocolParameters().MaxCommittableAge() { newCommittee := o.selectNewCommittee(slot) + fmt.Println("new committee selection finalization", epoch, newCommittee.TotalStake(), newCommittee.TotalValidatorStake()) o.events.CommitteeSelected.Trigger(newCommittee, epoch+1) } } diff --git a/pkg/storage/database/db_instance.go b/pkg/storage/database/db_instance.go index 671b073e1..14cdabeaf 100644 --- a/pkg/storage/database/db_instance.go +++ b/pkg/storage/database/db_instance.go @@ -3,11 +3,13 @@ package database import ( "github.com/iotaledger/hive.go/ierrors" "github.com/iotaledger/hive.go/kvstore" + "github.com/iotaledger/hive.go/lo" ) type DBInstance struct { - store kvstore.KVStore // KVStore that is used to access the DB instance + store *lockedKVStore // KVStore that is used to access the DB instance healthTracker *kvstore.StoreHealthTracker + dbConfig Config } func NewDBInstance(dbConfig Config) *DBInstance { @@ -15,7 +17,12 @@ func NewDBInstance(dbConfig Config) *DBInstance { if err != nil { panic(err) } - storeHealthTracker, err := kvstore.NewStoreHealthTracker(db, dbConfig.PrefixHealth, dbConfig.Version, nil) + + lockableKVStore := newLockedKVStore(db) + + // HealthTracker state is only modified while holding the lock on the lockableKVStore; + // that's why it needs to use openableKVStore (which does not lock) instead of lockableKVStore to avoid a deadlock. + storeHealthTracker, err := kvstore.NewStoreHealthTracker(lockableKVStore.openableKVStore, dbConfig.PrefixHealth, dbConfig.Version, nil) if err != nil { panic(ierrors.Wrapf(err, "database in %s is corrupted, delete database and resync node", dbConfig.Directory)) } @@ -24,20 +31,47 @@ func NewDBInstance(dbConfig Config) *DBInstance { } return &DBInstance{ - store: db, + store: lockableKVStore, healthTracker: storeHealthTracker, + dbConfig: dbConfig, } } func (d *DBInstance) Close() { + d.store.Lock() + defer d.store.Unlock() + + d.CloseWithoutLocking() +} + +func (d *DBInstance) CloseWithoutLocking() { if err := d.healthTracker.MarkHealthy(); err != nil { panic(err) } + if err := FlushAndClose(d.store); err != nil { panic(err) } } +// Open re-opens a closed DBInstance. It must only be called while holding a lock on DBInstance, +// otherwise it might cause a race condition and corruption of node's state. +func (d *DBInstance) Open() { + d.store.Replace(lo.PanicOnErr(StoreWithDefaultSettings(d.dbConfig.Directory, false, d.dbConfig.Engine))) + + if err := d.healthTracker.MarkCorrupted(); err != nil { + panic(err) + } +} + +func (d *DBInstance) Lock() { + d.store.Lock() +} + +func (d *DBInstance) Unlock() { + d.store.Unlock() +} + func (d *DBInstance) KVStore() kvstore.KVStore { return d.store } diff --git a/pkg/storage/database/lockedkvstore.go b/pkg/storage/database/lockedkvstore.go new file mode 100644 index 000000000..cb365f46e --- /dev/null +++ b/pkg/storage/database/lockedkvstore.go @@ -0,0 +1,172 @@ +package database + +import ( + "github.com/iotaledger/hive.go/ds/types" + "github.com/iotaledger/hive.go/ierrors" + "github.com/iotaledger/hive.go/kvstore" + "github.com/iotaledger/hive.go/runtime/syncutils" + "github.com/iotaledger/hive.go/serializer/v2/byteutils" +) + +type lockedKVStore struct { + *openableKVStore + + instanceMutex *syncutils.RWMutex +} + +func newLockedKVStore(storeInstance kvstore.KVStore) *lockedKVStore { + return &lockedKVStore{ + openableKVStore: newOpenableKVStore(storeInstance), + instanceMutex: new(syncutils.RWMutex), + } +} + +func (s *lockedKVStore) Lock() { + s.instanceMutex.Lock() +} + +func (s *lockedKVStore) Unlock() { + s.instanceMutex.Unlock() +} + +func (s *lockedKVStore) WithRealm(realm kvstore.Realm) (kvstore.KVStore, error) { + s.instanceMutex.RLock() + defer s.instanceMutex.RUnlock() + + return s.withRealm(realm) +} + +func (s *lockedKVStore) withRealm(realm kvstore.Realm) (kvstore.KVStore, error) { + return &lockedKVStore{ + openableKVStore: &openableKVStore{ + storeInstance: nil, + parentStore: s.openableKVStore, + dbPrefix: realm, + }, + + instanceMutex: s.instanceMutex, + }, nil +} + +func (s *lockedKVStore) WithExtendedRealm(realm kvstore.Realm) (kvstore.KVStore, error) { + s.instanceMutex.RLock() + defer s.instanceMutex.RUnlock() + + return s.withRealm(s.buildKeyPrefix(realm)) +} + +func (s *lockedKVStore) Iterate(prefix kvstore.KeyPrefix, kvConsumerFunc kvstore.IteratorKeyValueConsumerFunc, direction ...kvstore.IterDirection) error { + s.instanceMutex.RLock() + defer s.instanceMutex.RUnlock() + + return s.openableKVStore.Iterate(prefix, kvConsumerFunc, direction...) +} + +func (s *lockedKVStore) IterateKeys(prefix kvstore.KeyPrefix, consumerFunc kvstore.IteratorKeyConsumerFunc, direction ...kvstore.IterDirection) error { + s.instanceMutex.RLock() + defer s.instanceMutex.RUnlock() + + return s.openableKVStore.IterateKeys(prefix, consumerFunc, direction...) +} + +func (s *lockedKVStore) Clear() error { + s.instanceMutex.RLock() + defer s.instanceMutex.RUnlock() + + return s.openableKVStore.Clear() +} + +func (s *lockedKVStore) Get(key kvstore.Key) (value kvstore.Value, err error) { + s.instanceMutex.RLock() + defer s.instanceMutex.RUnlock() + + return s.openableKVStore.Get(key) +} + +func (s *lockedKVStore) Set(key kvstore.Key, value kvstore.Value) error { + s.instanceMutex.RLock() + defer s.instanceMutex.RUnlock() + + return s.openableKVStore.Set(key, value) +} + +func (s *lockedKVStore) Has(key kvstore.Key) (bool, error) { + s.instanceMutex.RLock() + defer s.instanceMutex.RUnlock() + + return s.openableKVStore.Has(key) +} + +func (s *lockedKVStore) Delete(key kvstore.Key) error { + s.instanceMutex.RLock() + defer s.instanceMutex.RUnlock() + + return s.openableKVStore.Delete(key) +} + +func (s *lockedKVStore) DeletePrefix(prefix kvstore.KeyPrefix) error { + s.instanceMutex.RLock() + defer s.instanceMutex.RUnlock() + + return s.openableKVStore.DeletePrefix(prefix) +} + +func (s *lockedKVStore) Flush() error { + s.instanceMutex.RLock() + defer s.instanceMutex.RUnlock() + + return s.FlushWithoutLocking() +} + +func (s *lockedKVStore) FlushWithoutLocking() error { + return s.openableKVStore.Flush() +} + +func (s *lockedKVStore) Close() error { + s.instanceMutex.RLock() + defer s.instanceMutex.RUnlock() + + if err := s.FlushWithoutLocking(); err != nil { + return ierrors.Wrap(err, "failed to flush database") + } + + return s.CloseWithoutLocking() +} + +func (s *lockedKVStore) CloseWithoutLocking() error { + return s.openableKVStore.Close() +} + +func (s *lockedKVStore) Batched() (kvstore.BatchedMutations, error) { + s.instanceMutex.RLock() + defer s.instanceMutex.RUnlock() + + return &syncedBatchedMutations{ + openableKVStoreBatchedMutations: &openableKVStoreBatchedMutations{ + parentStore: s.openableKVStore, + dbPrefix: s.dbPrefix, + setOperations: make(map[string]kvstore.Value), + deleteOperations: make(map[string]types.Empty), + }, + + parentStore: s, + }, nil +} + +// builds a key usable using the realm and the given prefix. +func (s *lockedKVStore) buildKeyPrefix(prefix kvstore.KeyPrefix) kvstore.KeyPrefix { + return byteutils.ConcatBytes(s.dbPrefix, prefix) +} + +type syncedBatchedMutations struct { + *openableKVStoreBatchedMutations + + parentStore *lockedKVStore +} + +func (s *syncedBatchedMutations) Commit() error { + s.parentStore.instanceMutex.RLock() + defer s.parentStore.instanceMutex.RUnlock() + + return s.openableKVStoreBatchedMutations.Commit() +} diff --git a/pkg/storage/database/openablekvstore.go b/pkg/storage/database/openablekvstore.go new file mode 100644 index 000000000..9ff04df3a --- /dev/null +++ b/pkg/storage/database/openablekvstore.go @@ -0,0 +1,181 @@ +package database + +import ( + "sync" + + "github.com/iotaledger/hive.go/ds/types" + "github.com/iotaledger/hive.go/kvstore" + "github.com/iotaledger/hive.go/kvstore/utils" + "github.com/iotaledger/hive.go/serializer/v2/byteutils" +) + +type openableKVStore struct { + storeInstance kvstore.KVStore // KVStore that is used to access the DB instance + parentStore *openableKVStore + dbPrefix kvstore.KeyPrefix +} + +func newOpenableKVStore(storeInstance kvstore.KVStore) *openableKVStore { + return &openableKVStore{ + storeInstance: storeInstance, + parentStore: nil, + dbPrefix: kvstore.EmptyPrefix, + } +} + +func (s *openableKVStore) instance() kvstore.KVStore { + if s.storeInstance != nil { + return s.storeInstance + } + + return s.parentStore.instance() +} + +func (s *openableKVStore) Replace(newKVStore kvstore.KVStore) { + if s.storeInstance == nil { + s.parentStore.Replace(newKVStore) + + return + } + + s.storeInstance = newKVStore +} + +func (s *openableKVStore) WithRealm(realm kvstore.Realm) (kvstore.KVStore, error) { + return s.withRealm(realm) +} +func (s *openableKVStore) withRealm(realm kvstore.Realm) (kvstore.KVStore, error) { + return &openableKVStore{ + storeInstance: nil, + parentStore: s, + dbPrefix: realm, + }, nil +} +func (s *openableKVStore) WithExtendedRealm(realm kvstore.Realm) (kvstore.KVStore, error) { + return s.withRealm(s.buildKeyPrefix(realm)) +} + +func (s *openableKVStore) Realm() kvstore.Realm { + return s.dbPrefix +} + +func (s *openableKVStore) Iterate(prefix kvstore.KeyPrefix, kvConsumerFunc kvstore.IteratorKeyValueConsumerFunc, direction ...kvstore.IterDirection) error { + return s.instance().Iterate(s.buildKeyPrefix(prefix), func(key kvstore.Key, value kvstore.Value) bool { + return kvConsumerFunc(utils.CopyBytes(key)[len(s.dbPrefix):], value) + }, direction...) +} + +func (s *openableKVStore) IterateKeys(prefix kvstore.KeyPrefix, consumerFunc kvstore.IteratorKeyConsumerFunc, direction ...kvstore.IterDirection) error { + return s.instance().IterateKeys(s.buildKeyPrefix(prefix), func(key kvstore.Key) bool { + return consumerFunc(utils.CopyBytes(key)[len(s.dbPrefix):]) + }, direction...) +} + +func (s *openableKVStore) Clear() error { + return s.instance().DeletePrefix(s.dbPrefix) +} + +func (s *openableKVStore) Get(key kvstore.Key) (value kvstore.Value, err error) { + return s.instance().Get(byteutils.ConcatBytes(s.dbPrefix, key)) +} + +func (s *openableKVStore) Set(key kvstore.Key, value kvstore.Value) error { + return s.instance().Set(byteutils.ConcatBytes(s.dbPrefix, key), value) +} + +func (s *openableKVStore) Has(key kvstore.Key) (bool, error) { + return s.instance().Has(byteutils.ConcatBytes(s.dbPrefix, key)) +} + +func (s *openableKVStore) Delete(key kvstore.Key) error { + return s.instance().Delete(byteutils.ConcatBytes(s.dbPrefix, key)) +} + +func (s *openableKVStore) DeletePrefix(prefix kvstore.KeyPrefix) error { + return s.instance().DeletePrefix(s.buildKeyPrefix(prefix)) +} + +func (s *openableKVStore) Flush() error { + return s.instance().Flush() +} +func (s *openableKVStore) Close() error { + return s.instance().Close() +} + +func (s *openableKVStore) Batched() (kvstore.BatchedMutations, error) { + return &openableKVStoreBatchedMutations{ + parentStore: s, + dbPrefix: s.dbPrefix, + setOperations: make(map[string]kvstore.Value), + deleteOperations: make(map[string]types.Empty), + }, nil +} + +// builds a key usable using the realm and the given prefix. +func (s *openableKVStore) buildKeyPrefix(prefix kvstore.KeyPrefix) kvstore.KeyPrefix { + return byteutils.ConcatBytes(s.dbPrefix, prefix) +} + +type openableKVStoreBatchedMutations struct { + parentStore *openableKVStore + dbPrefix kvstore.KeyPrefix + setOperations map[string]kvstore.Value + deleteOperations map[string]types.Empty + operationsMutex sync.Mutex +} + +func (s *openableKVStoreBatchedMutations) Set(key kvstore.Key, value kvstore.Value) error { + stringKey := byteutils.ConcatBytesToString(s.dbPrefix, key) + + s.operationsMutex.Lock() + defer s.operationsMutex.Unlock() + + delete(s.deleteOperations, stringKey) + s.setOperations[stringKey] = value + + return nil +} + +func (s *openableKVStoreBatchedMutations) Delete(key kvstore.Key) error { + stringKey := byteutils.ConcatBytesToString(s.dbPrefix, key) + + s.operationsMutex.Lock() + defer s.operationsMutex.Unlock() + + delete(s.setOperations, stringKey) + s.deleteOperations[stringKey] = types.Void + + return nil +} + +func (s *openableKVStoreBatchedMutations) Cancel() { + s.operationsMutex.Lock() + defer s.operationsMutex.Unlock() + + s.setOperations = make(map[string]kvstore.Value) + s.deleteOperations = make(map[string]types.Empty) +} + +func (s *openableKVStoreBatchedMutations) Commit() error { + batched, err := s.parentStore.instance().Batched() + if err != nil { + return err + } + + s.operationsMutex.Lock() + defer s.operationsMutex.Unlock() + + for key, value := range s.setOperations { + if err = batched.Set([]byte(key), value); err != nil { + return err + } + } + + for key := range s.deleteOperations { + if err = batched.Delete([]byte(key)); err != nil { + return err + } + } + + return batched.Commit() +} diff --git a/pkg/storage/database/utils.go b/pkg/storage/database/utils.go index 6f009247f..0b47cf41b 100644 --- a/pkg/storage/database/utils.go +++ b/pkg/storage/database/utils.go @@ -1,11 +1,9 @@ package database -import "github.com/iotaledger/hive.go/kvstore" - -func FlushAndClose(store kvstore.KVStore) error { - if err := store.Flush(); err != nil { +func FlushAndClose(store *lockedKVStore) error { + if err := store.FlushWithoutLocking(); err != nil { return err } - return store.Close() + return store.CloseWithoutLocking() } diff --git a/pkg/storage/permanent/commitments.go b/pkg/storage/permanent/commitments.go index 1b5a99177..49608142e 100644 --- a/pkg/storage/permanent/commitments.go +++ b/pkg/storage/permanent/commitments.go @@ -88,3 +88,13 @@ func (c *Commitments) Import(reader io.ReadSeeker) (err error) { return nil } + +func (c *Commitments) Rollback(targetIndex iotago.SlotIndex, lastCommittedIndex iotago.SlotIndex) error { + for slotIndex := targetIndex + 1; slotIndex <= lastCommittedIndex; slotIndex++ { + if err := c.store.KVStore().Delete(lo.PanicOnErr(slotIndex.Bytes())); err != nil { + return ierrors.Wrapf(err, "failed to remove forked commitment for slot %d", slotIndex) + } + } + + return nil +} diff --git a/pkg/storage/permanent/permanent.go b/pkg/storage/permanent/permanent.go index 1d81d4399..fd576d9c6 100644 --- a/pkg/storage/permanent/permanent.go +++ b/pkg/storage/permanent/permanent.go @@ -1,6 +1,8 @@ package permanent import ( + copydir "github.com/otiai10/copy" + "github.com/iotaledger/hive.go/ierrors" "github.com/iotaledger/hive.go/kvstore" "github.com/iotaledger/hive.go/lo" @@ -50,6 +52,21 @@ func New(dbConfig database.Config, errorHandler func(error), opts ...options.Opt }) } +func Clone(source *Permanent, dbConfig database.Config, errorHandler func(error), opts ...options.Option[Permanent]) (*Permanent, error) { + source.store.Lock() + defer source.store.Unlock() + + source.store.CloseWithoutLocking() + + if err := copydir.Copy(source.dbConfig.Directory, dbConfig.Directory); err != nil { + return nil, ierrors.Wrap(err, "failed to copy permanent storage directory to new storage path") + } + + source.store.Open() + + return New(dbConfig, errorHandler, opts...), nil +} + func (p *Permanent) Settings() *Settings { return p.settings } diff --git a/pkg/storage/permanent/settings.go b/pkg/storage/permanent/settings.go index 94f81603e..8f54a48f2 100644 --- a/pkg/storage/permanent/settings.go +++ b/pkg/storage/permanent/settings.go @@ -355,6 +355,7 @@ func (s *Settings) Export(writer io.WriteSeeker, targetCommitment *iotago.Commit return ierrors.Wrap(err, "failed to stream write protocol version epoch mapping") } + // TODO: rollback future protocol parameters if it was added after targetCommitment.Index() // Export future protocol parameters if err := stream.WriteCollection(writer, func() (uint64, error) { var count uint64 @@ -527,6 +528,16 @@ func (s *Settings) Import(reader io.ReadSeeker) (err error) { return nil } +func (s *Settings) Rollback(targetCommitment *model.Commitment) error { + // TODO: rollback future protocol parameters if it was added after targetCommitment.Index() + + if err := s.SetLatestCommitment(targetCommitment); err != nil { + return ierrors.Wrap(err, "failed to set latest commitment") + } + + return nil +} + func (s *Settings) String() string { s.mutex.RLock() defer s.mutex.RUnlock() diff --git a/pkg/storage/prunable/bucket_manager.go b/pkg/storage/prunable/bucket_manager.go index c4ec6ce8c..33195433e 100644 --- a/pkg/storage/prunable/bucket_manager.go +++ b/pkg/storage/prunable/bucket_manager.go @@ -29,6 +29,8 @@ type BucketManager struct { dbSizes *shrinkingmap.ShrinkingMap[iotago.EpochIndex, int64] optsMaxOpenDBs int + + mutex syncutils.RWMutex } func NewBucketManager(dbConfig database.Config, errorHandler func(error), opts ...options.Option[BucketManager]) *BucketManager { @@ -77,6 +79,7 @@ func (b *BucketManager) Shutdown() { b.openDBs.Each(func(index iotago.EpochIndex, db *database.DBInstance) { db.Close() + b.openDBs.Remove(index) }) } @@ -96,7 +99,8 @@ func (b *BucketManager) TotalSize() int64 { b.openDBs.Each(func(key iotago.EpochIndex, val *database.DBInstance) { size, err := dbPrunableDirectorySize(b.dbConfig.Directory, key) if err != nil { - b.errorHandler(ierrors.Wrapf(err, "dbPrunableDirectorySize failed for %s: %s", b.dbConfig.Directory, key)) + b.errorHandler(ierrors.Wrapf(err, "dbPrunableDirectorySize failed for key %s: %s", b.dbConfig.Directory, key)) + return } sum += size @@ -123,7 +127,7 @@ func (b *BucketManager) BucketSize(epoch iotago.EpochIndex) (int64, error) { size, err := dbPrunableDirectorySize(b.dbConfig.Directory, epoch) if err != nil { - return 0, ierrors.Wrapf(err, "dbPrunableDirectorySize failed for %s: %s", b.dbConfig.Directory, epoch) + return 0, ierrors.Wrapf(err, "dbPrunableDirectorySize failed for epoch %s: %s", b.dbConfig.Directory, epoch) } return size, nil @@ -169,6 +173,10 @@ func (b *BucketManager) RestoreFromDisk() (lastPrunedEpoch iotago.EpochIndex) { // epochIndex 1 -> db 1 // epochIndex 2 -> db 2 func (b *BucketManager) getDBInstance(index iotago.EpochIndex) (db *database.DBInstance) { + // Lock global mutex to prevent closing and copying storage data on disk during engine switching. + b.mutex.RLock() + defer b.mutex.RUnlock() + b.openDBsMutex.Lock() defer b.openDBsMutex.Unlock() @@ -193,13 +201,28 @@ func (b *BucketManager) Prune(epoch iotago.EpochIndex) error { return ierrors.Wrapf(database.ErrNoPruningNeeded, "epoch %d is already pruned", epoch) } + b.DeleteBucket(epoch) + + b.lastPrunedEpoch.MarkEvicted(epoch) + + return nil +} + +// DeleteBucket deletes directory that stores the data for the given bucket and returns boolean +// flag indicating whether a directory for that bucket existed. +func (b *BucketManager) DeleteBucket(epoch iotago.EpochIndex) (deleted bool) { b.openDBsMutex.Lock() defer b.openDBsMutex.Unlock() + if exists, err := PathExists(dbPathFromIndex(b.dbConfig.Directory, epoch)); err != nil { + panic(err) + } else if !exists { + return false + } + db, exists := b.openDBs.Get(epoch) if exists { db.Close() - b.openDBs.Remove(epoch) } @@ -209,11 +232,22 @@ func (b *BucketManager) Prune(epoch iotago.EpochIndex) error { // Delete the db size since we pruned the whole directory b.dbSizes.Delete(epoch) - b.lastPrunedEpoch.MarkEvicted(epoch) - return nil + return true } +// RollbackBucket removes data in the bucket in slots [targetSlotIndex+1; epochEndSlot]. +func (b *BucketManager) RollbackBucket(epochIndex iotago.EpochIndex, targetSlotIndex, epochEndSlot iotago.SlotIndex) error { + oldBucketKvStore := b.getDBInstance(epochIndex).KVStore() + for clearSlot := targetSlotIndex + 1; clearSlot <= epochEndSlot; clearSlot++ { + // delete slot prefix from forkedPrunable storage that will be eventually copied into the new engine + if err := oldBucketKvStore.DeletePrefix(clearSlot.MustBytes()); err != nil { + return ierrors.Wrapf(err, "error while clearing slot %d in bucket for epoch %d", clearSlot, epochIndex) + } + } + + return nil +} func (b *BucketManager) Flush() error { b.openDBsMutex.RLock() defer b.openDBsMutex.RUnlock() @@ -227,3 +261,15 @@ func (b *BucketManager) Flush() error { return err } + +func PathExists(path string) (bool, error) { + if _, err := os.Stat(path); err != nil { + if os.IsNotExist(err) { + return false, nil + } + + return false, err + } + + return true, nil +} diff --git a/pkg/storage/prunable/epochstore/epoch_kv.go b/pkg/storage/prunable/epochstore/epoch_kv.go index 0bec3e6b4..3fd580eb7 100644 --- a/pkg/storage/prunable/epochstore/epoch_kv.go +++ b/pkg/storage/prunable/epochstore/epoch_kv.go @@ -49,6 +49,10 @@ func (e *EpochKVStore) GetEpoch(epoch iotago.EpochIndex) (kvstore.KVStore, error return lo.PanicOnErr(e.kv.WithExtendedRealm(epoch.MustBytes())), nil } +func (e *EpochKVStore) DeleteEpoch(epoch iotago.EpochIndex) error { + return e.kv.DeletePrefix(epoch.MustBytes()) +} + func (e *EpochKVStore) Prune(epoch iotago.EpochIndex, defaultPruningDelay iotago.EpochIndex) error { // The epoch we're trying to prune already takes into account the defaultPruningDelay. // Therefore, we don't need to do anything if it is greater equal e.pruningDelay and take the difference otherwise. diff --git a/pkg/storage/prunable/epochstore/store.go b/pkg/storage/prunable/epochstore/store.go index 38eba9491..f93d71832 100644 --- a/pkg/storage/prunable/epochstore/store.go +++ b/pkg/storage/prunable/epochstore/store.go @@ -101,6 +101,10 @@ func (s *Store[V]) StreamBytes(consumer func([]byte, []byte) error) error { return innerErr } +func (s *Store[V]) DeleteEpoch(epoch iotago.EpochIndex) error { + return s.kv.DeletePrefix(epoch.MustBytes()) +} + func (s *Store[V]) Prune(epoch iotago.EpochIndex, defaultPruningDelay iotago.EpochIndex) error { // The epoch we're trying to prune already takes into account the defaultPruningDelay. // Therefore, we don't need to do anything if it is greater equal s.pruningDelay and take the difference otherwise. diff --git a/pkg/storage/prunable/prunable.go b/pkg/storage/prunable/prunable.go index 64c55189e..ba234ee16 100644 --- a/pkg/storage/prunable/prunable.go +++ b/pkg/storage/prunable/prunable.go @@ -1,6 +1,10 @@ package prunable import ( + "fmt" + + copydir "github.com/otiai10/copy" + "github.com/iotaledger/hive.go/ierrors" "github.com/iotaledger/hive.go/kvstore" "github.com/iotaledger/hive.go/runtime/ioutils" @@ -47,6 +51,28 @@ func New(dbConfig database.Config, apiProvider api.Provider, errorHandler func(e } } +func Clone(source *Prunable, dbConfig database.Config, apiProvider api.Provider, errorHandler func(error), opts ...options.Option[BucketManager]) (*Prunable, error) { + // Lock semi-permanent DB and prunable slot store so that nobody can try to use or open them while cloning. + source.semiPermanentDB.Lock() + defer source.semiPermanentDB.Unlock() + + source.prunableSlotStore.mutex.Lock() + defer source.prunableSlotStore.mutex.Unlock() + + // Close forked prunable storage before copying its contents. + source.semiPermanentDB.CloseWithoutLocking() + source.prunableSlotStore.Shutdown() + + // Copy the storage on disk to new location. + if err := copydir.Copy(source.prunableSlotStore.dbConfig.Directory, dbConfig.Directory); err != nil { + return nil, ierrors.Wrap(err, "failed to copy prunable storage directory to new storage path") + } + + source.semiPermanentDB.Open() + + return New(dbConfig, apiProvider, errorHandler, opts...), nil +} + func (p *Prunable) RestoreFromDisk() (lastPrunedEpoch iotago.EpochIndex) { lastPrunedEpoch = p.prunableSlotStore.RestoreFromDisk() @@ -118,3 +144,77 @@ func (p *Prunable) Flush() { p.errorHandler(err) } } + +func (p *Prunable) Rollback(targetSlotIndex iotago.SlotIndex) error { + timeProvider := p.apiProvider.APIForSlot(targetSlotIndex).TimeProvider() + targetSlotEpoch := timeProvider.EpochFromSlot(targetSlotIndex) + lastCommittedEpoch := targetSlotEpoch + // if the target index is the last slot of the epoch, the epoch was committed + if timeProvider.EpochEnd(targetSlotEpoch) != targetSlotIndex { + lastCommittedEpoch-- + } + + if err := p.prunableSlotStore.RollbackBucket(targetSlotEpoch, targetSlotIndex, timeProvider.EpochEnd(targetSlotEpoch)); err != nil { + return ierrors.Wrapf(err, "error while rolling back slots in a bucket for epoch %d", targetSlotEpoch) + } + + // Shut down the prunableSlotStore in order to flush and get consistent state on disk after reopening. + p.prunableSlotStore.Shutdown() + + // Removed entries that belong to the old fork and cannot be re-used. + for epochIdx := lastCommittedEpoch + 1; ; epochIdx++ { + if epochIdx > targetSlotEpoch { + shouldRollback, err := p.shouldRollbackCommittee(epochIdx, targetSlotIndex) + if err != nil { + return ierrors.Wrapf(err, "error while checking if committee for epoch %d should be rolled back", epochIdx) + } + + fmt.Println("rollback committee", shouldRollback, "epochIdx", epochIdx, "lastCommittedEpoch", lastCommittedEpoch, "targetSlotEpoch", targetSlotEpoch) + if shouldRollback { + if err := p.committee.DeleteEpoch(epochIdx); err != nil { + return ierrors.Wrapf(err, "error while deleting committee for epoch %d", epochIdx) + } + } + + if deleted := p.prunableSlotStore.DeleteBucket(epochIdx); !deleted { + break + } + } + + if err := p.poolRewards.DeleteEpoch(epochIdx); err != nil { + return ierrors.Wrapf(err, "error while deleting pool rewards for epoch %d", epochIdx) + } + if err := p.poolStats.DeleteEpoch(epochIdx); err != nil { + return ierrors.Wrapf(err, "error while deleting pool stats for epoch %d", epochIdx) + } + + if err := p.decidedUpgradeSignals.DeleteEpoch(epochIdx); err != nil { + return ierrors.Wrapf(err, "error while deleting decided upgrade signals for epoch %d", epochIdx) + } + } + + return nil +} + +// Remove committee for the next epoch only if forking point is before point of no return and committee is reused. +// Always remove committees for epochs that are newer than targetSlotEpoch+1. +func (p *Prunable) shouldRollbackCommittee(epochIndex iotago.EpochIndex, targetSlotIndex iotago.SlotIndex) (bool, error) { + timeProvider := p.apiProvider.APIForSlot(targetSlotIndex).TimeProvider() + targetSlotEpoch := timeProvider.EpochFromSlot(targetSlotIndex) + pointOfNoReturn := timeProvider.EpochEnd(targetSlotEpoch) - p.apiProvider.APIForSlot(targetSlotIndex).ProtocolParameters().MaxCommittableAge() + + if epochIndex >= targetSlotEpoch+1 { + if targetSlotIndex < pointOfNoReturn { + committee, err := p.committee.Load(targetSlotEpoch + 1) + if err != nil { + return false, err + } + + return committee.IsReused(), nil + } + + return false, nil + } + + return true, nil +} diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index ddbfa90af..85fc0eb3e 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -4,6 +4,7 @@ import ( "sync" "time" + "github.com/iotaledger/hive.go/ierrors" hivedb "github.com/iotaledger/hive.go/kvstore/database" "github.com/iotaledger/hive.go/runtime/options" "github.com/iotaledger/iota-core/pkg/model" @@ -52,7 +53,7 @@ type Storage struct { } // New creates a new storage instance with the named database version in the given directory. -func New(directory string, dbVersion byte, errorHandler func(error), opts ...options.Option[Storage]) *Storage { +func New(directory string, errorHandler func(error), opts ...options.Option[Storage]) *Storage { return options.Apply(&Storage{ dir: utils.NewDirectory(directory, true), errorHandler: errorHandler, @@ -63,18 +64,51 @@ func New(directory string, dbVersion byte, errorHandler func(error), opts ...opt optsPruningSizeMaxTargetSizeBytes: 30 * 1024 * 1024 * 1024, // 30GB optsPruningSizeReductionPercentage: 0.1, optsPruningSizeCooldownTime: 5 * time.Minute, - }, opts, - func(s *Storage) { - dbConfig := database.Config{ - Engine: s.optsDBEngine, - Directory: s.dir.PathWithCreate(permanentDirName), - Version: dbVersion, - PrefixHealth: []byte{storePrefixHealth}, - } - - s.permanent = permanent.New(dbConfig, errorHandler, s.optsPermanent...) - s.prunable = prunable.New(dbConfig.WithDirectory(s.dir.PathWithCreate(prunableDirName)), s.Settings().APIProvider(), s.errorHandler, s.optsBucketManagerOptions...) - }) + }, opts) +} + +// Create creates a new storage instance with the named database version in the given directory and initializes its permanent +// and prunable counterparts. +func Create(directory string, dbVersion byte, errorHandler func(error), opts ...options.Option[Storage]) *Storage { + s := New(directory, errorHandler, opts...) + dbConfig := database.Config{ + Engine: s.optsDBEngine, + Directory: s.dir.PathWithCreate(permanentDirName), + Version: dbVersion, + PrefixHealth: []byte{storePrefixHealth}, + } + + s.permanent = permanent.New(dbConfig, errorHandler, s.optsPermanent...) + s.prunable = prunable.New(dbConfig.WithDirectory(s.dir.PathWithCreate(prunableDirName)), s.Settings().APIProvider(), s.errorHandler, s.optsBucketManagerOptions...) + + return s +} + +// Clone creates a new storage instance with the named database version in the given directory and cloning the permannent +// and prunable counterparts from the given source storage. +func Clone(source *Storage, directory string, dbVersion byte, errorHandler func(error), opts ...options.Option[Storage]) (*Storage, error) { + s := New(directory, errorHandler, opts...) + + dbConfig := database.Config{ + Engine: s.optsDBEngine, + Directory: s.dir.PathWithCreate(permanentDirName), + Version: dbVersion, + PrefixHealth: []byte{storePrefixHealth}, + } + + permanentClone, err := permanent.Clone(source.permanent, dbConfig, errorHandler) + if err != nil { + return nil, ierrors.Wrap(err, "error while cloning permanent storage") + } + prunableClone, err := prunable.Clone(source.prunable, dbConfig.WithDirectory(s.dir.PathWithCreate(prunableDirName)), permanentClone.Settings().APIProvider(), s.errorHandler, s.optsBucketManagerOptions...) + if err != nil { + return nil, ierrors.Wrap(err, "error while cloning prunable storage") + } + + s.permanent = permanentClone + s.prunable = prunableClone + + return s, nil } func (s *Storage) Directory() string { diff --git a/pkg/storage/storage_prunable.go b/pkg/storage/storage_prunable.go index 05095b9dc..c61711d05 100644 --- a/pkg/storage/storage_prunable.go +++ b/pkg/storage/storage_prunable.go @@ -73,3 +73,7 @@ func (s *Storage) RestoreFromDisk() { s.lastPrunedEpoch.MarkEvicted(lastPrunedEpoch) } + +func (s *Storage) RollbackPrunable(targetIndex iotago.SlotIndex) error { + return s.prunable.Rollback(targetIndex) +} diff --git a/pkg/storage/storage_test.go b/pkg/storage/storage_test.go index 5aad3b9bf..a6e4623dd 100644 --- a/pkg/storage/storage_test.go +++ b/pkg/storage/storage_test.go @@ -6,13 +6,14 @@ import ( "github.com/stretchr/testify/require" "github.com/iotaledger/hive.go/ds/types" + "github.com/iotaledger/hive.go/kvstore" "github.com/iotaledger/iota-core/pkg/storage" "github.com/iotaledger/iota-core/pkg/storage/database" iotago "github.com/iotaledger/iota.go/v4" ) func TestStorage_PruneByEpochIndex(t *testing.T) { - tf := NewTestFramework(t) + tf := NewTestFramework(t, t.TempDir()) defer tf.Shutdown() totalEpochs := 10 @@ -52,7 +53,7 @@ func TestStorage_PruneByEpochIndex(t *testing.T) { } func TestStorage_PruneByDepth(t *testing.T) { - tf := NewTestFramework(t) + tf := NewTestFramework(t, t.TempDir()) defer tf.Shutdown() totalEpochs := 20 @@ -125,7 +126,7 @@ func TestStorage_PruneByDepth(t *testing.T) { } func TestStorage_PruneBySize(t *testing.T) { - tf := NewTestFramework(t, + tf := NewTestFramework(t, t.TempDir(), storage.WithPruningDelay(2), storage.WithPruningSizeEnable(true), storage.WithPruningSizeMaxTargetSizeBytes(15*MB), @@ -166,7 +167,7 @@ func TestStorage_PruneBySize(t *testing.T) { } func TestStorage_RestoreFromDisk(t *testing.T) { - tf := NewTestFramework(t, storage.WithPruningDelay(1)) + tf := NewTestFramework(t, t.TempDir(), storage.WithPruningDelay(1)) totalEpochs := 9 tf.GeneratePermanentData(5 * MB) @@ -209,3 +210,112 @@ func TestStorage_RestoreFromDisk(t *testing.T) { types.NewTuple(0, false), ) } + +func TestStorage_CopyFromForkedStorageEmpty(t *testing.T) { + tf1 := NewTestFramework(t, t.TempDir()) + + totalEpochs := 14 + // Generate data in the old storage (source). It contains data since the genesis and one epoch after the fork. + for i := 0; i <= totalEpochs; i++ { + tf1.GeneratePrunableData(iotago.EpochIndex(i), 500*KB) + tf1.GenerateSemiPermanentData(iotago.EpochIndex(i)) + } + tf1.GeneratePermanentData(1 * MB) + + clonedStorage, err := storage.Clone(tf1.Instance, t.TempDir(), 0, func(err error) { + t.Log(err) + }) + require.NoError(t, err) + + // Assert that permanent storage contains exactly the same data. + permanentKVStoreSource, err := tf1.Instance.Accounts().WithRealm(kvstore.EmptyPrefix) + require.NoError(t, err) + permanentKVStoreTarget, err := clonedStorage.Accounts().WithRealm(kvstore.EmptyPrefix) + require.NoError(t, err) + + require.NoError(t, permanentKVStoreSource.Iterate(kvstore.EmptyPrefix, func(key kvstore.Key, sourceValue kvstore.Value) bool { + targetValue, getErr := permanentKVStoreTarget.Get(key) + require.NoError(t, getErr) + + require.NotNil(t, targetValue) + require.EqualValues(t, sourceValue, targetValue) + + return true + })) + + require.NoError(t, permanentKVStoreTarget.Iterate(kvstore.EmptyPrefix, func(key kvstore.Key, sourceValue kvstore.Value) bool { + targetValue, getErr := permanentKVStoreSource.Get(key) + require.NoError(t, getErr) + + require.NotNil(t, targetValue) + require.EqualValues(t, sourceValue, targetValue) + + return true + })) + + // Assert that semiPermanentStorage contains exactly the same data. + rewardsKVStoreSource, err := tf1.Instance.RewardsForEpoch(0) + require.NoError(t, err) + semiPermanentKVStoreSource, err := rewardsKVStoreSource.WithRealm(kvstore.EmptyPrefix) + require.NoError(t, err) + rewardsKVStoreTarget, err := clonedStorage.RewardsForEpoch(0) + require.NoError(t, err) + semiPermanentKVStoreTarget, err := rewardsKVStoreTarget.WithRealm(kvstore.EmptyPrefix) + require.NoError(t, err) + + require.NoError(t, semiPermanentKVStoreSource.Iterate(kvstore.EmptyPrefix, func(key kvstore.Key, sourceValue kvstore.Value) bool { + targetValue, getErr := semiPermanentKVStoreTarget.Get(key) + require.NoError(t, getErr) + + require.NotNil(t, targetValue) + require.EqualValues(t, sourceValue, targetValue) + + return true + })) + + require.NoError(t, semiPermanentKVStoreTarget.Iterate(kvstore.EmptyPrefix, func(key kvstore.Key, sourceValue kvstore.Value) bool { + targetValue, getErr := semiPermanentKVStoreSource.Get(key) + require.NoError(t, getErr) + + require.NotNil(t, targetValue) + require.EqualValues(t, sourceValue, targetValue) + + return true + })) + + // Assert that prunableSlotStorage contains exactly the same data. + for epochIdx := 0; epochIdx <= totalEpochs; epochIdx++ { + // little hack to retrieve underlying prunableSlotStore KVStore without any realm + epochStartSlot := tf1.apiProvider.CurrentAPI().TimeProvider().EpochStart(iotago.EpochIndex(epochIdx)) + + attestationKVStoreSource, err := tf1.Instance.Attestations(epochStartSlot) + require.NoError(t, err) + prunableSlotKVStoreSource, err := attestationKVStoreSource.WithRealm(kvstore.EmptyPrefix) + require.NoError(t, err) + + attestationKVStoreTarget, err := clonedStorage.Attestations(epochStartSlot) + require.NoError(t, err) + prunableSlotKVStoreTarget, err := attestationKVStoreTarget.WithRealm([]byte{}) + require.NoError(t, err) + + require.NoError(t, prunableSlotKVStoreSource.Iterate(kvstore.EmptyPrefix, func(key kvstore.Key, sourceValue kvstore.Value) bool { + targetValue, getErr := prunableSlotKVStoreTarget.Get(key) + require.NoError(t, getErr) + + require.NotNil(t, targetValue) + require.EqualValues(t, sourceValue, targetValue) + + return true + })) + + require.NoError(t, prunableSlotKVStoreTarget.Iterate(kvstore.EmptyPrefix, func(key kvstore.Key, sourceValue kvstore.Value) bool { + targetValue, getErr := prunableSlotKVStoreSource.Get(key) + require.NoError(t, getErr) + + require.NotNil(t, targetValue) + require.EqualValues(t, sourceValue, targetValue) + + return true + })) + } +} diff --git a/pkg/storage/testframework_test.go b/pkg/storage/testframework_test.go index 86cd2ff7c..7a75c2407 100644 --- a/pkg/storage/testframework_test.go +++ b/pkg/storage/testframework_test.go @@ -6,6 +6,7 @@ import ( "math/rand" "path/filepath" "testing" + "time" "github.com/stretchr/testify/require" @@ -41,14 +42,13 @@ type TestFramework struct { storageFactoryFunc func() *storage.Storage } -func NewTestFramework(t *testing.T, storageOpts ...options.Option[storage.Storage]) *TestFramework { +func NewTestFramework(t *testing.T, baseDir string, storageOpts ...options.Option[storage.Storage]) *TestFramework { errorHandler := func(err error) { t.Log(err) } - baseDir := t.TempDir() storageFactoryFunc := func() *storage.Storage { - instance := storage.New(baseDir, 0, errorHandler, storageOpts...) + instance := storage.Create(baseDir, 0, errorHandler, storageOpts...) require.NoError(t, instance.Settings().StoreProtocolParametersForStartEpoch(iotago.NewV3ProtocolParameters(), 0)) return instance @@ -87,8 +87,8 @@ func (t *TestFramework) GeneratePrunableData(epoch iotago.EpochIndex, size int64 initialStorageSize := t.Instance.PrunableDatabaseSize() apiForEpoch := t.apiProvider.APIForEpoch(epoch) + startSlot := apiForEpoch.TimeProvider().EpochStart(epoch) endSlot := apiForEpoch.TimeProvider().EpochEnd(epoch) - var createdBytes int64 for createdBytes < size { block := tpkg.RandProtocolBlock(&iotago.BasicBlock{ @@ -100,7 +100,9 @@ func (t *TestFramework) GeneratePrunableData(epoch iotago.EpochIndex, size int64 modelBlock, err := model.BlockFromBlock(block, apiForEpoch) require.NoError(t.t, err) - blockStorageForSlot, err := t.Instance.Blocks(endSlot) + // block slot is randomly selected within the epoch + blockSlot := startSlot + iotago.SlotIndex(rand.Intn(int(endSlot-startSlot+1))) + blockStorageForSlot, err := t.Instance.Blocks(blockSlot) require.NoError(t.t, err) err = blockStorageForSlot.Store(modelBlock) require.NoError(t.t, err) @@ -110,6 +112,10 @@ func (t *TestFramework) GeneratePrunableData(epoch iotago.EpochIndex, size int64 } t.Instance.Flush() + + // Sleep to let RocksDB perform compaction. + time.Sleep(100 * time.Millisecond) + t.assertPrunableSizeGreater(initialStorageSize + size) // fmt.Printf("> created %d MB of bucket prunable data\n\tPermanent: %dMB\n\tPrunable: %dMB\n", createdBytes/MB, t.Instance.PermanentDatabaseSize()/MB, t.Instance.PrunableDatabaseSize()/MB) @@ -140,7 +146,7 @@ func (t *TestFramework) GenerateSemiPermanentData(epoch iotago.EpochIndex) { versionAndHash := model.VersionAndHash{ Version: 1, - Hash: iotago.Identifier{2}, + Hash: tpkg.Rand32ByteArray(), } err = decidedUpgradeSignalsStore.Store(epoch, versionAndHash) require.NoError(t.t, err) @@ -204,8 +210,8 @@ func (t *TestFramework) AssertPrunedUntil( expectedDecidedUpgrades *types.Tuple[int, bool], expectedPoolStats *types.Tuple[int, bool], expectedCommittee *types.Tuple[int, bool], - expectedRewards *types.Tuple[int, bool]) { - + expectedRewards *types.Tuple[int, bool], +) { t.assertPrunedState(expectedPrunable, t.Instance.LastPrunedEpoch, "prunable") t.assertPrunedState(expectedPoolStats, t.Instance.PoolStats().LastPrunedEpoch, "pool stats") t.assertPrunedState(expectedDecidedUpgrades, t.Instance.DecidedUpgradeSignals().LastPrunedEpoch, "decided upgrades") diff --git a/pkg/tests/protocol_engine_rollback_test.go b/pkg/tests/protocol_engine_rollback_test.go new file mode 100644 index 000000000..940de5d6c --- /dev/null +++ b/pkg/tests/protocol_engine_rollback_test.go @@ -0,0 +1,781 @@ +package tests + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/iotaledger/hive.go/core/eventticker" + "github.com/iotaledger/hive.go/lo" + "github.com/iotaledger/hive.go/runtime/module" + "github.com/iotaledger/hive.go/runtime/options" + "github.com/iotaledger/iota-core/pkg/core/account" + "github.com/iotaledger/iota-core/pkg/protocol" + "github.com/iotaledger/iota-core/pkg/protocol/chainmanager" + "github.com/iotaledger/iota-core/pkg/protocol/engine" + "github.com/iotaledger/iota-core/pkg/protocol/engine/blocks" + "github.com/iotaledger/iota-core/pkg/protocol/sybilprotection/seatmanager" + "github.com/iotaledger/iota-core/pkg/protocol/sybilprotection/seatmanager/mock" + "github.com/iotaledger/iota-core/pkg/protocol/sybilprotection/sybilprotectionv1" + "github.com/iotaledger/iota-core/pkg/storage" + "github.com/iotaledger/iota-core/pkg/testsuite" + mock2 "github.com/iotaledger/iota-core/pkg/testsuite/mock" + iotago "github.com/iotaledger/iota.go/v4" +) + +func TestProtocol_EngineRollbackFinalization(t *testing.T) { + ts := testsuite.NewTestSuite(t, + testsuite.WithLivenessThreshold(1), + testsuite.WithMinCommittableAge(2), + testsuite.WithMaxCommittableAge(3), + testsuite.WithEpochNearingThreshold(5), + testsuite.WithSlotsPerEpochExponent(3), + testsuite.WithGenesisTimestampOffset(1000*10), + + testsuite.WithWaitFor(15*time.Second), + ) + defer ts.Shutdown() + + node0 := ts.AddValidatorNode("node0") + node1 := ts.AddValidatorNode("node1") + node2 := ts.AddValidatorNode("node2") + node3 := ts.AddValidatorNode("node3") + + poaProvider := func() module.Provider[*engine.Engine, seatmanager.SeatManager] { + return module.Provide(func(e *engine.Engine) seatmanager.SeatManager { + poa := mock.NewManualPOAProvider()(e).(*mock.ManualPOA) + + for _, node := range []*mock2.Node{node0, node1, node2, node3} { + if node.Validator { + poa.AddAccount(node.AccountID, node.Name) + } + } + poa.SetOnline("node0", "node1", "node2", "node3") + + return poa + }) + } + + nodeOptions := make(map[string][]options.Option[protocol.Protocol]) + for _, node := range ts.Nodes() { + nodeOptions[node.Name] = []options.Option[protocol.Protocol]{ + protocol.WithChainManagerOptions( + chainmanager.WithCommitmentRequesterOptions( + eventticker.RetryInterval[iotago.SlotIndex, iotago.CommitmentID](1*time.Second), + eventticker.RetryJitter[iotago.SlotIndex, iotago.CommitmentID](500*time.Millisecond), + ), + ), + protocol.WithSybilProtectionProvider( + sybilprotectionv1.NewProvider( + sybilprotectionv1.WithSeatManagerProvider( + poaProvider(), + ), + ), + ), + protocol.WithEngineOptions( + engine.WithBlockRequesterOptions( + eventticker.RetryInterval[iotago.SlotIndex, iotago.BlockID](1*time.Second), + eventticker.RetryJitter[iotago.SlotIndex, iotago.BlockID](500*time.Millisecond), + ), + ), + protocol.WithStorageOptions( + storage.WithPruningDelay(20), + ), + } + } + + ts.Run(false, nodeOptions) + + // Verify that nodes have the expected states. + + expectedCommittee := []iotago.AccountID{ + node0.AccountID, + node1.AccountID, + node2.AccountID, + node3.AccountID, + } + expectedOnlineCommitteeFull := []account.SeatIndex{ + lo.Return1(node0.Protocol.MainEngineInstance().SybilProtection.SeatManager().Committee(1).GetSeat(node0.AccountID)), + lo.Return1(node0.Protocol.MainEngineInstance().SybilProtection.SeatManager().Committee(1).GetSeat(node1.AccountID)), + lo.Return1(node0.Protocol.MainEngineInstance().SybilProtection.SeatManager().Committee(1).GetSeat(node2.AccountID)), + lo.Return1(node0.Protocol.MainEngineInstance().SybilProtection.SeatManager().Committee(1).GetSeat(node3.AccountID)), + } + + for _, node := range ts.Nodes() { + node.Protocol.MainEngineInstance().SybilProtection.SeatManager().(*mock.ManualPOA).SetOnline("node0", "node1", "node2", "node3") + } + + { + genesisCommitment := iotago.NewEmptyCommitment(ts.API.ProtocolParameters().Version()) + genesisCommitment.RMC = ts.API.ProtocolParameters().CongestionControlParameters().RMCMin + ts.AssertNodeState(ts.Nodes(), + testsuite.WithSnapshotImported(true), + testsuite.WithProtocolParameters(ts.API.ProtocolParameters()), + testsuite.WithLatestCommitment(genesisCommitment), + testsuite.WithLatestFinalizedSlot(0), + testsuite.WithChainID(genesisCommitment.MustID()), + testsuite.WithStorageCommitments([]*iotago.Commitment{genesisCommitment}), + + testsuite.WithSybilProtectionCommittee(0, expectedCommittee), + testsuite.WithSybilProtectionOnlineCommittee(expectedOnlineCommitteeFull...), + testsuite.WithEvictedSlot(0), + testsuite.WithActiveRootBlocks(ts.Blocks("Genesis")), + testsuite.WithStorageRootBlocks(ts.Blocks("Genesis")), + ) + } + + // Issue up to slot 11 - just before committee selection for the next epoch. + // Committee will be reused at slot 10 is finalized or slot 12 is committed, whichever happens first. + { + ts.IssueBlocksAtSlots("P0:", []iotago.SlotIndex{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, 4, "Genesis", ts.Nodes(), true, nil) + + ts.AssertNodeState(ts.Nodes(), + testsuite.WithLatestFinalizedSlot(8), + testsuite.WithLatestCommitmentSlotIndex(9), + testsuite.WithEqualStoredCommitmentAtIndex(9), + testsuite.WithLatestCommitmentCumulativeWeight(28), // 7 for each slot starting from 4 + testsuite.WithSybilProtectionCommittee(9, expectedCommittee), + testsuite.WithSybilProtectionOnlineCommittee(expectedOnlineCommitteeFull...), + testsuite.WithEvictedSlot(9), + ) + + for _, slot := range []iotago.SlotIndex{4, 5, 6, 7, 8, 9} { + var attestationBlocks []*blocks.Block + for _, node := range ts.Nodes() { + if node.Validator { + attestationBlocks = append(attestationBlocks, ts.Block(fmt.Sprintf("P0:%d.3-%s", slot, node.Name))) + } + } + ts.AssertAttestationsForSlot(slot, attestationBlocks, ts.Nodes()...) + } + + ts.AssertBlocksExist(ts.BlocksWithPrefix("P0"), true, ts.Nodes()...) + } + + { + ts.IssueBlocksAtSlots("P0:", []iotago.SlotIndex{12, 13, 14, 15, 16}, 4, "P0:11.3", ts.Nodes(), true, nil) + + ts.AssertNodeState(ts.Nodes(), + testsuite.WithLatestFinalizedSlot(13), + testsuite.WithLatestCommitmentSlotIndex(14), + testsuite.WithEqualStoredCommitmentAtIndex(14), + testsuite.WithLatestCommitmentCumulativeWeight(48), // 7 for each slot starting from 4 + testsuite.WithSybilProtectionCommittee(14, expectedCommittee), + testsuite.WithSybilProtectionOnlineCommittee(expectedOnlineCommitteeFull...), + testsuite.WithEvictedSlot(14), + ) + + ts.AssertBlocksExist(ts.BlocksWithPrefix("P0"), true, ts.Nodes()...) + } + + newEngine, err := node3.Protocol.EngineManager.ForkEngineAtSlot(13) + require.NoError(t, err) + + // Assert state of the forked engine after rollback. + { + require.EqualValues(t, 13, newEngine.Storage.Settings().LatestCommitment().Index()) + require.EqualValues(t, 13, newEngine.Storage.Settings().LatestFinalizedSlot()) + require.EqualValues(t, 13, newEngine.EvictionState.LastEvictedSlot()) + + for epochIndex := 0; epochIndex <= 2; epochIndex++ { + committeeEpoch, err := newEngine.Storage.Committee().Load(iotago.EpochIndex(epochIndex)) + require.NoError(t, err) + require.Len(t, committeeEpoch.IDs(), 4) + } + + // Commmittee for the future epoch does not exist. + committeeEpoch3, err := newEngine.Storage.Committee().Load(3) + require.NoError(t, err) + require.Nil(t, committeeEpoch3) + + for slotIndex := 1; slotIndex <= 13; slotIndex++ { + copiedCommitment, err := newEngine.Storage.Commitments().Load(iotago.SlotIndex(slotIndex)) + require.NoError(t, err) + sourceCommitment, err := node1.Protocol.MainEngineInstance().Storage.Commitments().Load(iotago.SlotIndex(slotIndex)) + require.NoError(t, err) + require.Equal(t, sourceCommitment.ID(), copiedCommitment.ID()) + } + + // Commitment for the first slot after the fork does not exist. + _, err = newEngine.Storage.Commitments().Load(iotago.SlotIndex(14)) + require.Error(t, err) + } +} + +func TestProtocol_EngineRollbackNoFinalization(t *testing.T) { + ts := testsuite.NewTestSuite(t, + testsuite.WithLivenessThreshold(1), + testsuite.WithMinCommittableAge(2), + testsuite.WithMaxCommittableAge(3), + testsuite.WithEpochNearingThreshold(5), + testsuite.WithSlotsPerEpochExponent(3), + testsuite.WithGenesisTimestampOffset(1000*10), + + testsuite.WithWaitFor(15*time.Second), + ) + defer ts.Shutdown() + + node0 := ts.AddValidatorNode("node0") + node1 := ts.AddValidatorNode("node1") + node2 := ts.AddValidatorNode("node2") + node3 := ts.AddValidatorNode("node3") + + poaProvider := func() module.Provider[*engine.Engine, seatmanager.SeatManager] { + return module.Provide(func(e *engine.Engine) seatmanager.SeatManager { + poa := mock.NewManualPOAProvider()(e).(*mock.ManualPOA) + + for _, node := range []*mock2.Node{node0, node1, node2, node3} { + if node.Validator { + poa.AddAccount(node.AccountID, node.Name) + } + } + poa.SetOnline("node0", "node1", "node2", "node3") + + return poa + }) + } + + nodeOptions := make(map[string][]options.Option[protocol.Protocol]) + for _, node := range ts.Nodes() { + nodeOptions[node.Name] = []options.Option[protocol.Protocol]{ + protocol.WithChainManagerOptions( + chainmanager.WithCommitmentRequesterOptions( + eventticker.RetryInterval[iotago.SlotIndex, iotago.CommitmentID](1*time.Second), + eventticker.RetryJitter[iotago.SlotIndex, iotago.CommitmentID](500*time.Millisecond), + ), + ), + protocol.WithSybilProtectionProvider( + sybilprotectionv1.NewProvider( + sybilprotectionv1.WithSeatManagerProvider( + poaProvider(), + ), + ), + ), + protocol.WithEngineOptions( + engine.WithBlockRequesterOptions( + eventticker.RetryInterval[iotago.SlotIndex, iotago.BlockID](1*time.Second), + eventticker.RetryJitter[iotago.SlotIndex, iotago.BlockID](500*time.Millisecond), + ), + ), + protocol.WithStorageOptions( + storage.WithPruningDelay(20), + ), + } + } + + ts.Run(false, nodeOptions) + + // Verify that nodes have the expected states. + + expectedCommittee := []iotago.AccountID{ + node0.AccountID, + node1.AccountID, + node2.AccountID, + node3.AccountID, + } + expectedOnlineCommitteeFull := []account.SeatIndex{ + lo.Return1(node0.Protocol.MainEngineInstance().SybilProtection.SeatManager().Committee(1).GetSeat(node0.AccountID)), + lo.Return1(node0.Protocol.MainEngineInstance().SybilProtection.SeatManager().Committee(1).GetSeat(node1.AccountID)), + lo.Return1(node0.Protocol.MainEngineInstance().SybilProtection.SeatManager().Committee(1).GetSeat(node2.AccountID)), + lo.Return1(node0.Protocol.MainEngineInstance().SybilProtection.SeatManager().Committee(1).GetSeat(node3.AccountID)), + } + + expectedOnlineCommitteeHalf := []account.SeatIndex{ + lo.Return1(node0.Protocol.MainEngineInstance().SybilProtection.SeatManager().Committee(1).GetSeat(node0.AccountID)), + lo.Return1(node0.Protocol.MainEngineInstance().SybilProtection.SeatManager().Committee(1).GetSeat(node1.AccountID)), + } + + for _, node := range ts.Nodes() { + node.Protocol.MainEngineInstance().SybilProtection.SeatManager().(*mock.ManualPOA).SetOnline("node0", "node1", "node2", "node3") + } + + { + genesisCommitment := iotago.NewEmptyCommitment(ts.API.ProtocolParameters().Version()) + genesisCommitment.RMC = ts.API.ProtocolParameters().CongestionControlParameters().RMCMin + ts.AssertNodeState(ts.Nodes(), + testsuite.WithSnapshotImported(true), + testsuite.WithProtocolParameters(ts.API.ProtocolParameters()), + testsuite.WithLatestCommitment(genesisCommitment), + testsuite.WithLatestFinalizedSlot(0), + testsuite.WithChainID(genesisCommitment.MustID()), + testsuite.WithStorageCommitments([]*iotago.Commitment{genesisCommitment}), + + testsuite.WithSybilProtectionCommittee(0, expectedCommittee), + testsuite.WithSybilProtectionOnlineCommittee(expectedOnlineCommitteeFull...), + testsuite.WithEvictedSlot(0), + testsuite.WithActiveRootBlocks(ts.Blocks("Genesis")), + testsuite.WithStorageRootBlocks(ts.Blocks("Genesis")), + ) + } + + // Issue up to slot 11 - just before committee selection for the next epoch. + // Committee will be reused at slot 10 is finalized or slot 12 is committed, whichever happens first. + { + ts.IssueBlocksAtSlots("P0:", []iotago.SlotIndex{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, 4, "Genesis", ts.Nodes(), true, nil) + + ts.AssertNodeState(ts.Nodes(), + testsuite.WithLatestFinalizedSlot(8), + testsuite.WithLatestCommitmentSlotIndex(9), + testsuite.WithEqualStoredCommitmentAtIndex(9), + testsuite.WithLatestCommitmentCumulativeWeight(28), // 7 for each slot starting from 4 + testsuite.WithSybilProtectionCommittee(9, expectedCommittee), + testsuite.WithSybilProtectionOnlineCommittee(expectedOnlineCommitteeFull...), + testsuite.WithEvictedSlot(9), + ) + + for _, slot := range []iotago.SlotIndex{4, 5, 6, 7, 8, 9} { + var attestationBlocks []*blocks.Block + for _, node := range ts.Nodes() { + if node.Validator { + attestationBlocks = append(attestationBlocks, ts.Block(fmt.Sprintf("P0:%d.3-%s", slot, node.Name))) + } + } + ts.AssertAttestationsForSlot(slot, attestationBlocks, ts.Nodes()...) + } + + ts.AssertBlocksExist(ts.BlocksWithPrefix("P0"), true, ts.Nodes()...) + } + + // Update online committee. + for _, node := range ts.Nodes() { + manualPOA := node.Protocol.MainEngineInstance().SybilProtection.SeatManager().(*mock.ManualPOA) + manualPOA.SetOnline("node0", "node1") + manualPOA.SetOffline("node2", "node3") + } + + { + ts.IssueBlocksAtSlots("P0:", []iotago.SlotIndex{12, 13, 14, 15, 16}, 4, "P0:11.3", []*mock2.Node{node0, node1}, true, nil) + + ts.AssertNodeState(ts.Nodes(), + testsuite.WithLatestFinalizedSlot(8), + testsuite.WithLatestCommitmentSlotIndex(14), + testsuite.WithEqualStoredCommitmentAtIndex(14), + testsuite.WithLatestCommitmentCumulativeWeight(44), // 7 for each slot starting from 4 + testsuite.WithSybilProtectionCommittee(14, expectedCommittee), + testsuite.WithSybilProtectionOnlineCommittee(expectedOnlineCommitteeHalf...), + testsuite.WithEvictedSlot(14), + ) + + ts.AssertBlocksExist(ts.BlocksWithPrefix("P0"), true, ts.Nodes()...) + } + + newEngine, err := node3.Protocol.EngineManager.ForkEngineAtSlot(13) + require.NoError(t, err) + + // Assert state of the forked engine after rollback. + { + require.EqualValues(t, 13, newEngine.Storage.Settings().LatestCommitment().Index()) + require.EqualValues(t, 8, newEngine.Storage.Settings().LatestFinalizedSlot()) + require.EqualValues(t, 13, newEngine.EvictionState.LastEvictedSlot()) + + for epochIndex := 0; epochIndex <= 2; epochIndex++ { + committeeEpoch, err := newEngine.Storage.Committee().Load(iotago.EpochIndex(epochIndex)) + require.NoError(t, err) + require.Len(t, committeeEpoch.IDs(), 4) + } + + // Commmittee for the future epoch does not exist. + committeeEpoch3, err := newEngine.Storage.Committee().Load(3) + require.NoError(t, err) + require.Nil(t, committeeEpoch3) + + for slotIndex := 1; slotIndex <= 13; slotIndex++ { + copiedCommitment, err := newEngine.Storage.Commitments().Load(iotago.SlotIndex(slotIndex)) + require.NoError(t, err) + sourceCommitment, err := node1.Protocol.MainEngineInstance().Storage.Commitments().Load(iotago.SlotIndex(slotIndex)) + require.NoError(t, err) + require.Equal(t, sourceCommitment.ID(), copiedCommitment.ID()) + } + + // Commitment for the first slot after the fork does not exist. + _, err = newEngine.Storage.Commitments().Load(iotago.SlotIndex(14)) + require.Error(t, err) + } +} + +func TestProtocol_EngineRollbackNoFinalizationLastSlot(t *testing.T) { + ts := testsuite.NewTestSuite(t, + testsuite.WithLivenessThreshold(1), + testsuite.WithMinCommittableAge(2), + testsuite.WithMaxCommittableAge(3), + testsuite.WithEpochNearingThreshold(5), + testsuite.WithSlotsPerEpochExponent(3), + testsuite.WithGenesisTimestampOffset(1000*10), + + testsuite.WithWaitFor(15*time.Second), + ) + defer ts.Shutdown() + + node0 := ts.AddValidatorNode("node0") + node1 := ts.AddValidatorNode("node1") + node2 := ts.AddValidatorNode("node2") + node3 := ts.AddValidatorNode("node3") + + poaProvider := func() module.Provider[*engine.Engine, seatmanager.SeatManager] { + return module.Provide(func(e *engine.Engine) seatmanager.SeatManager { + poa := mock.NewManualPOAProvider()(e).(*mock.ManualPOA) + + for _, node := range []*mock2.Node{node0, node1, node2, node3} { + if node.Validator { + poa.AddAccount(node.AccountID, node.Name) + } + } + poa.SetOnline("node0", "node1", "node2", "node3") + + return poa + }) + } + + nodeOptions := make(map[string][]options.Option[protocol.Protocol]) + for _, node := range ts.Nodes() { + nodeOptions[node.Name] = []options.Option[protocol.Protocol]{ + protocol.WithChainManagerOptions( + chainmanager.WithCommitmentRequesterOptions( + eventticker.RetryInterval[iotago.SlotIndex, iotago.CommitmentID](1*time.Second), + eventticker.RetryJitter[iotago.SlotIndex, iotago.CommitmentID](500*time.Millisecond), + ), + ), + protocol.WithSybilProtectionProvider( + sybilprotectionv1.NewProvider( + sybilprotectionv1.WithSeatManagerProvider( + poaProvider(), + ), + ), + ), + protocol.WithEngineOptions( + engine.WithBlockRequesterOptions( + eventticker.RetryInterval[iotago.SlotIndex, iotago.BlockID](1*time.Second), + eventticker.RetryJitter[iotago.SlotIndex, iotago.BlockID](500*time.Millisecond), + ), + ), + protocol.WithStorageOptions( + storage.WithPruningDelay(20), + ), + } + } + + ts.Run(false, nodeOptions) + + // Verify that nodes have the expected states. + + expectedCommittee := []iotago.AccountID{ + node0.AccountID, + node1.AccountID, + node2.AccountID, + node3.AccountID, + } + expectedOnlineCommitteeFull := []account.SeatIndex{ + lo.Return1(node0.Protocol.MainEngineInstance().SybilProtection.SeatManager().Committee(1).GetSeat(node0.AccountID)), + lo.Return1(node0.Protocol.MainEngineInstance().SybilProtection.SeatManager().Committee(1).GetSeat(node1.AccountID)), + lo.Return1(node0.Protocol.MainEngineInstance().SybilProtection.SeatManager().Committee(1).GetSeat(node2.AccountID)), + lo.Return1(node0.Protocol.MainEngineInstance().SybilProtection.SeatManager().Committee(1).GetSeat(node3.AccountID)), + } + + expectedOnlineCommitteeHalf := []account.SeatIndex{ + lo.Return1(node0.Protocol.MainEngineInstance().SybilProtection.SeatManager().Committee(1).GetSeat(node0.AccountID)), + lo.Return1(node0.Protocol.MainEngineInstance().SybilProtection.SeatManager().Committee(1).GetSeat(node1.AccountID)), + } + + for _, node := range ts.Nodes() { + node.Protocol.MainEngineInstance().SybilProtection.SeatManager().(*mock.ManualPOA).SetOnline("node0", "node1", "node2", "node3") + } + + { + genesisCommitment := iotago.NewEmptyCommitment(ts.API.ProtocolParameters().Version()) + genesisCommitment.RMC = ts.API.ProtocolParameters().CongestionControlParameters().RMCMin + ts.AssertNodeState(ts.Nodes(), + testsuite.WithSnapshotImported(true), + testsuite.WithProtocolParameters(ts.API.ProtocolParameters()), + testsuite.WithLatestCommitment(genesisCommitment), + testsuite.WithLatestFinalizedSlot(0), + testsuite.WithChainID(genesisCommitment.MustID()), + testsuite.WithStorageCommitments([]*iotago.Commitment{genesisCommitment}), + + testsuite.WithSybilProtectionCommittee(0, expectedCommittee), + testsuite.WithSybilProtectionOnlineCommittee(expectedOnlineCommitteeFull...), + testsuite.WithEvictedSlot(0), + testsuite.WithActiveRootBlocks(ts.Blocks("Genesis")), + testsuite.WithStorageRootBlocks(ts.Blocks("Genesis")), + ) + } + + // Issue up to slot 11 - just before committee selection for the next epoch. + // Committee will be reused at slot 10 is finalized or slot 12 is committed, whichever happens first. + { + ts.IssueBlocksAtSlots("P0:", []iotago.SlotIndex{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, 4, "Genesis", ts.Nodes(), true, nil) + + ts.AssertNodeState(ts.Nodes(), + testsuite.WithLatestFinalizedSlot(8), + testsuite.WithLatestCommitmentSlotIndex(9), + testsuite.WithEqualStoredCommitmentAtIndex(9), + testsuite.WithLatestCommitmentCumulativeWeight(28), // 7 for each slot starting from 4 + testsuite.WithSybilProtectionCommittee(9, expectedCommittee), + testsuite.WithSybilProtectionOnlineCommittee(expectedOnlineCommitteeFull...), + testsuite.WithEvictedSlot(9), + ) + + for _, slot := range []iotago.SlotIndex{4, 5, 6, 7, 8, 9} { + var attestationBlocks []*blocks.Block + for _, node := range ts.Nodes() { + if node.Validator { + attestationBlocks = append(attestationBlocks, ts.Block(fmt.Sprintf("P0:%d.3-%s", slot, node.Name))) + } + } + ts.AssertAttestationsForSlot(slot, attestationBlocks, ts.Nodes()...) + } + + ts.AssertBlocksExist(ts.BlocksWithPrefix("P0"), true, ts.Nodes()...) + } + + // Update online committee. + for _, node := range ts.Nodes() { + manualPOA := node.Protocol.MainEngineInstance().SybilProtection.SeatManager().(*mock.ManualPOA) + manualPOA.SetOnline("node0", "node1") + manualPOA.SetOffline("node2", "node3") + } + + { + ts.IssueBlocksAtSlots("P0:", []iotago.SlotIndex{12, 13, 14, 15, 16, 17, 18, 19}, 4, "P0:11.3", []*mock2.Node{node0, node1}, true, nil) + + ts.AssertNodeState(ts.Nodes(), + testsuite.WithLatestFinalizedSlot(8), + testsuite.WithLatestCommitmentSlotIndex(17), + testsuite.WithEqualStoredCommitmentAtIndex(17), + testsuite.WithLatestCommitmentCumulativeWeight(50), // 7 for each slot starting from 4 + testsuite.WithSybilProtectionCommittee(17, expectedCommittee), + testsuite.WithSybilProtectionOnlineCommittee(expectedOnlineCommitteeHalf...), + testsuite.WithEvictedSlot(17), + ) + + ts.AssertBlocksExist(ts.BlocksWithPrefix("P0"), true, ts.Nodes()...) + } + + newEngine, err := node3.Protocol.EngineManager.ForkEngineAtSlot(15) + require.NoError(t, err) + + // Assert state of the forked engine after rollback. + { + require.EqualValues(t, 15, newEngine.Storage.Settings().LatestCommitment().Index()) + require.EqualValues(t, 8, newEngine.Storage.Settings().LatestFinalizedSlot()) + require.EqualValues(t, 15, newEngine.EvictionState.LastEvictedSlot()) + + for epochIndex := 0; epochIndex <= 2; epochIndex++ { + committeeEpoch, err := newEngine.Storage.Committee().Load(iotago.EpochIndex(epochIndex)) + require.NoError(t, err) + require.Len(t, committeeEpoch.IDs(), 4) + } + + // Commmittee for the future epoch does not exist. + committeeEpoch3, err := newEngine.Storage.Committee().Load(3) + require.NoError(t, err) + require.Nil(t, committeeEpoch3) + + for slotIndex := 1; slotIndex <= 15; slotIndex++ { + copiedCommitment, err := newEngine.Storage.Commitments().Load(iotago.SlotIndex(slotIndex)) + require.NoError(t, err) + sourceCommitment, err := node1.Protocol.MainEngineInstance().Storage.Commitments().Load(iotago.SlotIndex(slotIndex)) + require.NoError(t, err) + require.Equal(t, sourceCommitment.ID(), copiedCommitment.ID()) + } + + // Commitment for the first slot after the fork does not exist. + _, err = newEngine.Storage.Commitments().Load(iotago.SlotIndex(16)) + require.Error(t, err) + } +} + +func TestProtocol_EngineRollbackNoFinalizationBeforePointOfNoReturn(t *testing.T) { + ts := testsuite.NewTestSuite(t, + testsuite.WithLivenessThreshold(1), + testsuite.WithMinCommittableAge(2), + testsuite.WithMaxCommittableAge(3), + testsuite.WithEpochNearingThreshold(5), + testsuite.WithSlotsPerEpochExponent(3), + testsuite.WithGenesisTimestampOffset(1000*10), + + testsuite.WithWaitFor(15*time.Second), + ) + defer ts.Shutdown() + + node0 := ts.AddValidatorNode("node0") + node1 := ts.AddValidatorNode("node1") + node2 := ts.AddValidatorNode("node2") + node3 := ts.AddValidatorNode("node3") + + poaProvider := func() module.Provider[*engine.Engine, seatmanager.SeatManager] { + return module.Provide(func(e *engine.Engine) seatmanager.SeatManager { + poa := mock.NewManualPOAProvider()(e).(*mock.ManualPOA) + + for _, node := range []*mock2.Node{node0, node1, node2, node3} { + if node.Validator { + poa.AddAccount(node.AccountID, node.Name) + } + } + poa.SetOnline("node0", "node1", "node2", "node3") + + return poa + }) + } + + nodeOptions := make(map[string][]options.Option[protocol.Protocol]) + for _, node := range ts.Nodes() { + nodeOptions[node.Name] = []options.Option[protocol.Protocol]{ + protocol.WithChainManagerOptions( + chainmanager.WithCommitmentRequesterOptions( + eventticker.RetryInterval[iotago.SlotIndex, iotago.CommitmentID](1*time.Second), + eventticker.RetryJitter[iotago.SlotIndex, iotago.CommitmentID](500*time.Millisecond), + ), + ), + protocol.WithSybilProtectionProvider( + sybilprotectionv1.NewProvider( + sybilprotectionv1.WithSeatManagerProvider( + poaProvider(), + ), + ), + ), + protocol.WithEngineOptions( + engine.WithBlockRequesterOptions( + eventticker.RetryInterval[iotago.SlotIndex, iotago.BlockID](1*time.Second), + eventticker.RetryJitter[iotago.SlotIndex, iotago.BlockID](500*time.Millisecond), + ), + ), + protocol.WithStorageOptions( + storage.WithPruningDelay(20), + ), + } + } + + ts.Run(false, nodeOptions) + + // Verify that nodes have the expected states. + + expectedCommittee := []iotago.AccountID{ + node0.AccountID, + node1.AccountID, + node2.AccountID, + node3.AccountID, + } + expectedOnlineCommitteeFull := []account.SeatIndex{ + lo.Return1(node0.Protocol.MainEngineInstance().SybilProtection.SeatManager().Committee(1).GetSeat(node0.AccountID)), + lo.Return1(node0.Protocol.MainEngineInstance().SybilProtection.SeatManager().Committee(1).GetSeat(node1.AccountID)), + lo.Return1(node0.Protocol.MainEngineInstance().SybilProtection.SeatManager().Committee(1).GetSeat(node2.AccountID)), + lo.Return1(node0.Protocol.MainEngineInstance().SybilProtection.SeatManager().Committee(1).GetSeat(node3.AccountID)), + } + + expectedOnlineCommitteeHalf := []account.SeatIndex{ + lo.Return1(node0.Protocol.MainEngineInstance().SybilProtection.SeatManager().Committee(1).GetSeat(node0.AccountID)), + lo.Return1(node0.Protocol.MainEngineInstance().SybilProtection.SeatManager().Committee(1).GetSeat(node1.AccountID)), + } + + for _, node := range ts.Nodes() { + node.Protocol.MainEngineInstance().SybilProtection.SeatManager().(*mock.ManualPOA).SetOnline("node0", "node1", "node2", "node3") + } + + { + genesisCommitment := iotago.NewEmptyCommitment(ts.API.ProtocolParameters().Version()) + genesisCommitment.RMC = ts.API.ProtocolParameters().CongestionControlParameters().RMCMin + ts.AssertNodeState(ts.Nodes(), + testsuite.WithSnapshotImported(true), + testsuite.WithProtocolParameters(ts.API.ProtocolParameters()), + testsuite.WithLatestCommitment(genesisCommitment), + testsuite.WithLatestFinalizedSlot(0), + testsuite.WithChainID(genesisCommitment.MustID()), + testsuite.WithStorageCommitments([]*iotago.Commitment{genesisCommitment}), + + testsuite.WithSybilProtectionCommittee(0, expectedCommittee), + testsuite.WithSybilProtectionOnlineCommittee(expectedOnlineCommitteeFull...), + testsuite.WithEvictedSlot(0), + testsuite.WithActiveRootBlocks(ts.Blocks("Genesis")), + testsuite.WithStorageRootBlocks(ts.Blocks("Genesis")), + ) + } + + // Issue up to slot 11 - just before committee selection for the next epoch. + // Committee will be reused at slot 10 is finalized or slot 12 is committed, whichever happens first. + { + ts.IssueBlocksAtSlots("P0:", []iotago.SlotIndex{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, 4, "Genesis", ts.Nodes(), true, nil) + + ts.AssertNodeState(ts.Nodes(), + testsuite.WithLatestFinalizedSlot(8), + testsuite.WithLatestCommitmentSlotIndex(9), + testsuite.WithEqualStoredCommitmentAtIndex(9), + testsuite.WithLatestCommitmentCumulativeWeight(28), // 7 for each slot starting from 4 + testsuite.WithSybilProtectionCommittee(9, expectedCommittee), + testsuite.WithSybilProtectionOnlineCommittee(expectedOnlineCommitteeFull...), + testsuite.WithEvictedSlot(9), + ) + + for _, slot := range []iotago.SlotIndex{4, 5, 6, 7, 8, 9} { + var attestationBlocks []*blocks.Block + for _, node := range ts.Nodes() { + if node.Validator { + attestationBlocks = append(attestationBlocks, ts.Block(fmt.Sprintf("P0:%d.3-%s", slot, node.Name))) + } + } + ts.AssertAttestationsForSlot(slot, attestationBlocks, ts.Nodes()...) + } + + ts.AssertBlocksExist(ts.BlocksWithPrefix("P0"), true, ts.Nodes()...) + } + + // Update online committee. + for _, node := range ts.Nodes() { + manualPOA := node.Protocol.MainEngineInstance().SybilProtection.SeatManager().(*mock.ManualPOA) + manualPOA.SetOnline("node0", "node1") + manualPOA.SetOffline("node2", "node3") + } + + { + ts.IssueBlocksAtSlots("P0:", []iotago.SlotIndex{12, 13, 14, 15}, 4, "P0:11.3", []*mock2.Node{node0, node1}, true, nil) + + ts.AssertNodeState(ts.Nodes(), + testsuite.WithLatestFinalizedSlot(8), + testsuite.WithLatestCommitmentSlotIndex(13), + testsuite.WithEqualStoredCommitmentAtIndex(13), + testsuite.WithLatestCommitmentCumulativeWeight(42), // 7 for each slot starting from 4 + testsuite.WithSybilProtectionCommittee(13, expectedCommittee), + testsuite.WithSybilProtectionOnlineCommittee(expectedOnlineCommitteeHalf...), + testsuite.WithEvictedSlot(13), + ) + + ts.AssertBlocksExist(ts.BlocksWithPrefix("P0"), true, ts.Nodes()...) + } + + newEngine, err := node3.Protocol.EngineManager.ForkEngineAtSlot(9) + require.NoError(t, err) + + // Assert state of the forked engine after rollback. + { + require.EqualValues(t, 9, newEngine.Storage.Settings().LatestCommitment().Index()) + require.EqualValues(t, 8, newEngine.Storage.Settings().LatestFinalizedSlot()) + require.EqualValues(t, 9, newEngine.EvictionState.LastEvictedSlot()) + + for epochIndex := 0; epochIndex <= 1; epochIndex++ { + committeeEpoch, err := newEngine.Storage.Committee().Load(iotago.EpochIndex(epochIndex)) + require.NoError(t, err) + require.Len(t, committeeEpoch.IDs(), 4) + } + + // Commmittee for the future epoch does not exist. + committeeEpoch2, err := newEngine.Storage.Committee().Load(2) + require.NoError(t, err) + require.Nil(t, committeeEpoch2) + + for slotIndex := 1; slotIndex <= 9; slotIndex++ { + copiedCommitment, err := newEngine.Storage.Commitments().Load(iotago.SlotIndex(slotIndex)) + require.NoError(t, err) + sourceCommitment, err := node1.Protocol.MainEngineInstance().Storage.Commitments().Load(iotago.SlotIndex(slotIndex)) + require.NoError(t, err) + require.Equal(t, sourceCommitment.ID(), copiedCommitment.ID()) + } + + // Commitment for the first slot after the fork does not exist. + _, err = newEngine.Storage.Commitments().Load(iotago.SlotIndex(10)) + require.Error(t, err) + } +} + +// TODO: test fork before point of no return (slot 12) +// TODO: test fork on last slot of an epoch (slot 15) diff --git a/pkg/tests/protocol_engine_switching_test.go b/pkg/tests/protocol_engine_switching_test.go index 0d29be462..093e5e66c 100644 --- a/pkg/tests/protocol_engine_switching_test.go +++ b/pkg/tests/protocol_engine_switching_test.go @@ -319,7 +319,6 @@ func TestProtocol_EngineSwitching(t *testing.T) { manualPOA := node.Protocol.MainEngineInstance().SybilProtection.SeatManager().(*mock2.ManualPOA) manualPOA.SetOnline("node0", "node1", "node2", "node3", "node4", "node6", "node7") } - // Merge the partitions { ts.MergePartitionsToMain() @@ -362,5 +361,12 @@ func TestProtocol_EngineSwitching(t *testing.T) { wg.Wait() } + // Make sure that nodes that switched their engine still have blocks with prefix P0 from before the fork. + // Those nodes should also have all the blocks from the target fork P1 and should not have blocks from P2. + // This is to make sure that the storage was copied correctly during engine switching. + ts.AssertBlocksExist(ts.BlocksWithPrefix("P0"), true, ts.Nodes()...) + ts.AssertBlocksExist(ts.BlocksWithPrefix("P1"), true, ts.Nodes()...) + ts.AssertBlocksExist(ts.BlocksWithPrefix("P2"), false, ts.Nodes()...) + ts.AssertEqualStoredCommitmentAtIndex(expectedCommittedSlotAfterPartitionMerge, ts.Nodes()...) } diff --git a/pkg/testsuite/mock/node.go b/pkg/testsuite/mock/node.go index 0cdb4aad0..d5905990d 100644 --- a/pkg/testsuite/mock/node.go +++ b/pkg/testsuite/mock/node.go @@ -30,7 +30,6 @@ import ( "github.com/iotaledger/iota-core/pkg/protocol/engine/filter" "github.com/iotaledger/iota-core/pkg/protocol/engine/mempool" "github.com/iotaledger/iota-core/pkg/protocol/engine/notarization" - "github.com/iotaledger/iota-core/pkg/protocol/engine/tipmanager" iotago "github.com/iotaledger/iota.go/v4" "github.com/iotaledger/iota.go/v4/merklehasher" ) @@ -182,21 +181,17 @@ func (n *Node) hookLogging(failOnBlockFiltered bool) { fmt.Printf("%s > Network.AttestationsRequestReceived: from %s %s\n", n.Name, source, id) }) - events.ChainManager.RequestCommitment.Hook(func(commitmentID iotago.CommitmentID) { - fmt.Printf("%s > ChainManager.RequestCommitment: %s\n", n.Name, commitmentID) - }) - - events.ChainManager.CommitmentBelowRoot.Hook(func(commitmentID iotago.CommitmentID) { - fmt.Printf("%s > ChainManager.CommitmentBelowRoot: %s\n", n.Name, commitmentID) - }) + //events.ChainManager.CommitmentBelowRoot.Hook(func(commitmentID iotago.CommitmentID) { + // fmt.Printf("%s > ChainManager.CommitmentBelowRoot: %s\n", n.Name, commitmentID) + //}) events.ChainManager.ForkDetected.Hook(func(fork *chainmanager.Fork) { fmt.Printf("%s > ChainManager.ForkDetected: %s\n", n.Name, fork) }) - events.Engine.TipManager.BlockAdded.Hook(func(tipMetadata tipmanager.TipMetadata) { - fmt.Printf("%s > TipManager.BlockAdded: %s in pool %d\n", n.Name, tipMetadata.ID(), tipMetadata.TipPool().Get()) - }) + //events.Engine.TipManager.BlockAdded.Hook(func(tipMetadata tipmanager.TipMetadata) { + // fmt.Printf("%s > TipManager.BlockAdded: %s in pool %d\n", n.Name, tipMetadata.ID(), tipMetadata.TipPool().Get()) + //}) events.CandidateEngineActivated.Hook(func(e *engine.Engine) { fmt.Printf("%s > CandidateEngineActivated: %s, ChainID:%s Index:%s\n", n.Name, e.Name(), e.ChainID(), e.ChainID().Index())