diff --git a/pkg/protocol/engines.go b/pkg/protocol/engines.go index c3be37c89..7ec714ec9 100644 --- a/pkg/protocol/engines.go +++ b/pkg/protocol/engines.go @@ -81,29 +81,39 @@ func (e *Engines) initLogging() (shutdown func()) { // ForkAtSlot creates a new engine instance that forks from the main engine at the given slot. func (e *Engines) ForkAtSlot(slot iotago.SlotIndex) (*engine.Engine, error) { + e.LogInfo("forking engine at slot", "slot", slot) + newEngineAlias := lo.PanicOnErr(uuid.NewUUID()).String() errorHandler := func(err error) { e.protocol.LogError("engine error", "err", err, "name", newEngineAlias[0:8]) } + e.LogInfo("forking engine at slot 2", "slot", slot) + // copy raw data on disk. newStorage, err := storage.Clone(e, e.Main.Get().Storage, e.directory.Path(newEngineAlias), DatabaseVersion, errorHandler, e.protocol.Options.StorageOptions...) if err != nil { return nil, ierrors.Wrapf(err, "failed to copy storage from active engine instance (%s) to new engine instance (%s)", e.Main.Get().Storage.Directory(), e.directory.Path(newEngineAlias)) } + e.LogInfo("forking engine at slot 3", "slot", slot) + // remove commitments that after forking point. latestCommitment := newStorage.Settings().LatestCommitment() if err = newStorage.Commitments().Rollback(slot, latestCommitment.Slot()); err != nil { return nil, ierrors.Wrap(err, "failed to rollback commitments") } + e.LogInfo("forking engine at slot 4", "slot", slot) + // some components are automatically rolled back by deleting their data on disk (e.g. slot based storage). // some other components need to be rolled back manually, like the UTXO ledger for example. // we need to create temporary components to rollback their permanent state, which will be reflected on disk. evictionState := eviction.NewState(newStorage.Settings(), newStorage.RootBlocks) evictionState.Initialize(latestCommitment.Slot()) + e.LogInfo("forking engine at slot 5", "slot", slot) + blockCache := blocks.New(evictionState, newStorage.Settings().APIProvider()) accountsManager := accountsledger.New(e.protocol.NewSubModule("ForkedAccountsLedger"), newStorage.Settings().APIProvider(), blockCache.Block, newStorage.AccountDiffs, newStorage.Accounts()) @@ -119,19 +129,27 @@ func (e *Engines) ForkAtSlot(slot iotago.SlotIndex) (*engine.Engine, error) { return nil, err } + e.LogInfo("forking engine at slot 6", "slot", slot) + targetCommitment, err := newStorage.Commitments().Load(slot) if err != nil { return nil, ierrors.Wrapf(err, "error while retrieving commitment for target index %d", slot) } + e.LogInfo("forking engine at slot 6.1", "slot", slot) + if err = newStorage.Settings().Rollback(targetCommitment); err != nil { return nil, err } + e.LogInfo("forking engine at slot 6.2", "slot", slot) if err = newStorage.Rollback(slot); err != nil { + e.LogError("failed to rollback storage", "err", err) return nil, err } + e.LogInfo("forking engine at slot 7", "slot", slot) + candidateEngine := e.loadEngineInstanceWithStorage(newEngineAlias, newStorage) // rollback attestations already on created engine instance, because this action modifies the in-memory storage. @@ -139,6 +157,8 @@ func (e *Engines) ForkAtSlot(slot iotago.SlotIndex) (*engine.Engine, error) { return nil, ierrors.Wrap(err, "error while rolling back attestations storage on candidate engine") } + e.LogInfo("forking engine at slot 8", "slot", slot) + return candidateEngine, nil } @@ -274,7 +294,9 @@ func (e *Engines) injectEngineInstances() (shutdown func()) { } else { e.protocol.Network.OnShutdown(func() { newEngine.ShutdownEvent().Trigger() }) + e.LogInfo("injecting engine instance before", "chain", chain.LogName()) chain.Engine.Set(newEngine) + e.LogInfo("injecting engine instance after", "chain", chain.LogName()) } }) }) diff --git a/pkg/storage/prunable/epochstore/epoch_kv.go b/pkg/storage/prunable/epochstore/epoch_kv.go index 1c106b28a..0275afd26 100644 --- a/pkg/storage/prunable/epochstore/epoch_kv.go +++ b/pkg/storage/prunable/epochstore/epoch_kv.go @@ -1,6 +1,8 @@ package epochstore import ( + "fmt" + "github.com/iotaledger/hive.go/ierrors" "github.com/iotaledger/hive.go/kvstore" "github.com/iotaledger/hive.go/lo" @@ -108,10 +110,12 @@ func (e *EpochKVStore) Prune(epoch iotago.EpochIndex, defaultPruningDelay iotago func (e *EpochKVStore) RollbackEpochs(epoch iotago.EpochIndex) (lastPrunedEpoch iotago.EpochIndex, err error) { lastAccessedEpoch, err := e.LastAccessedEpoch() + fmt.Println("lastAccessedEpoch: ", lastAccessedEpoch) if err != nil { return lastAccessedEpoch, ierrors.Wrap(err, "failed to get last accessed epoch") } + fmt.Println("epoch: ", epoch) for epochToPrune := epoch; epochToPrune <= lastAccessedEpoch; epochToPrune++ { if err = e.DeleteEpoch(epochToPrune); err != nil { return epochToPrune, ierrors.Wrapf(err, "error while deleting epoch %d", epochToPrune) diff --git a/pkg/storage/prunable/prunable.go b/pkg/storage/prunable/prunable.go index e90f2b0b8..e186eedb6 100644 --- a/pkg/storage/prunable/prunable.go +++ b/pkg/storage/prunable/prunable.go @@ -1,6 +1,8 @@ package prunable import ( + "fmt" + copydir "github.com/otiai10/copy" "github.com/iotaledger/hive.go/ierrors" @@ -171,34 +173,47 @@ func (p *Prunable) Flush() { } func (p *Prunable) Rollback(targetEpoch iotago.EpochIndex, startPruneRange iotago.SlotIndex, endPruneRange iotago.SlotIndex) error { + fmt.Println("Rollback", targetEpoch, startPruneRange, endPruneRange) + if err := p.prunableSlotStore.PruneSlots(targetEpoch, startPruneRange, endPruneRange); err != nil { return ierrors.Wrapf(err, "failed to prune slots in range [%d, %d] from target epoch %d", startPruneRange, endPruneRange, targetEpoch) } + fmt.Println("Rollback 2", targetEpoch, startPruneRange, endPruneRange) + if err := p.rollbackCommitteesCandidates(targetEpoch, startPruneRange); err != nil { return ierrors.Wrapf(err, "failed to rollback committee candidates to target epoch %d", targetEpoch) } + fmt.Println("Rollback 3", targetEpoch, startPruneRange, endPruneRange) lastPrunedCommitteeEpoch, err := p.rollbackCommitteeEpochs(targetEpoch+1, startPruneRange-1) if err != nil { return ierrors.Wrapf(err, "failed to rollback committee epochs to target epoch %d", targetEpoch) } + fmt.Println("Rollback 4", targetEpoch, startPruneRange, endPruneRange) + lastPrunedPoolStatsEpoch, _, err := p.poolStats.RollbackEpochs(targetEpoch) if err != nil { return ierrors.Wrapf(err, "failed to rollback pool stats epochs to target epoch %d", targetEpoch) } + fmt.Println("Rollback 5", targetEpoch, startPruneRange, endPruneRange) + lastPrunedDecidedUpgradeSignalsEpoch, _, err := p.decidedUpgradeSignals.RollbackEpochs(targetEpoch) if err != nil { return ierrors.Wrapf(err, "failed to rollback decided upgrade signals epochs to target epoch %d", targetEpoch) } + fmt.Println("Rollback 6", targetEpoch, startPruneRange, endPruneRange) + lastPrunedPoolRewardsEpoch, err := p.poolRewards.RollbackEpochs(targetEpoch) if err != nil { return ierrors.Wrapf(err, "failed to rollback pool rewards epochs to target epoch %d", targetEpoch) } + fmt.Println("Rollback 7 ", targetEpoch, startPruneRange, endPruneRange) + for epochToPrune := targetEpoch + 1; epochToPrune <= max( lastPrunedCommitteeEpoch, lastPrunedPoolStatsEpoch, @@ -207,6 +222,7 @@ func (p *Prunable) Rollback(targetEpoch iotago.EpochIndex, startPruneRange iotag ); epochToPrune++ { p.prunableSlotStore.DeleteBucket(epochToPrune) } + fmt.Println("Rollback 8", targetEpoch, startPruneRange, endPruneRange) return nil } diff --git a/pkg/tests/loss_of_acceptance_test.go b/pkg/tests/loss_of_acceptance_test.go index 7ad739286..f3eb950a8 100644 --- a/pkg/tests/loss_of_acceptance_test.go +++ b/pkg/tests/loss_of_acceptance_test.go @@ -2,6 +2,9 @@ package tests import ( "fmt" + "net/http" + _ "net/http" + _ "net/http/pprof" "testing" "time" @@ -18,6 +21,12 @@ import ( ) func TestLossOfAcceptanceFromGenesis(t *testing.T) { + // debug.SetEnabled(true) + + go func() { + fmt.Println(http.ListenAndServe("localhost:6061", nil)) + }() + ts := testsuite.NewTestSuite(t, testsuite.WithProtocolParametersOptions( iotago.WithTimeProviderOptions( @@ -36,21 +45,21 @@ func TestLossOfAcceptanceFromGenesis(t *testing.T) { ), testsuite.WithWaitFor(15*time.Second), ) - defer ts.Shutdown() + // defer ts.Shutdown() node0 := ts.AddValidatorNode("node0") ts.AddDefaultWallet(node0) node1 := ts.AddValidatorNode("node1") - node2 := ts.AddNode("node2") + // node2 := ts.AddNode("node2") - nodesP1 := []*mock.Node{node0, node2} + nodesP1 := []*mock.Node{node0} nodesP2 := []*mock.Node{node1} ts.Run(true, nil) - node0.Protocol.SetLogLevel(log.LevelFatal) - node1.Protocol.SetLogLevel(log.LevelFatal) - node2.Protocol.SetLogLevel(log.LevelFatal) + node0.Protocol.SetLogLevel(log.LevelTrace) + // node1.Protocol.SetLogLevel(log.LevelTrace) + // node2.Protocol.SetLogLevel(log.LevelFatal) // Create snapshot to use later. snapshotPath := ts.Directory.Path(fmt.Sprintf("%d_snapshot", time.Now().Unix())) @@ -101,8 +110,8 @@ func TestLossOfAcceptanceFromGenesis(t *testing.T) { ts.AssertBlocksExist(ts.BlocksWithPrefix("P1"), true, ts.ClientsForNodes(nodesP1...)...) ts.AssertBlocksExist(ts.BlocksWithPrefix("P1"), false, ts.ClientsForNodes(nodesP2...)...) } - ts.AssertSybilProtectionOnlineCommittee(seatIndexes[0:1], node0, node2) - ts.AssertSybilProtectionOnlineCommittee(seatIndexes[0:1], node1) + ts.AssertSybilProtectionOnlineCommittee(seatIndexes[0:1], nodesP1...) + ts.AssertSybilProtectionOnlineCommittee(seatIndexes[0:1], nodesP2...) // Issue in P2 { @@ -115,32 +124,36 @@ func TestLossOfAcceptanceFromGenesis(t *testing.T) { ts.AssertBlocksExist(ts.BlocksWithPrefix("P2"), false, ts.ClientsForNodes(nodesP1...)...) ts.AssertBlocksExist(ts.BlocksWithPrefix("P2"), true, ts.ClientsForNodes(nodesP2...)...) } - ts.AssertSybilProtectionOnlineCommittee(seatIndexes[0:1], node0, node2) - ts.AssertSybilProtectionOnlineCommittee(seatIndexes[1:2], node1) + ts.AssertSybilProtectionOnlineCommittee(seatIndexes[0:1], nodesP1...) + ts.AssertSybilProtectionOnlineCommittee(seatIndexes[1:2], nodesP2...) // Start node3 from genesis snapshot. - node3 := ts.AddNode("node3") - { - node3.Initialize(true, - protocol.WithSnapshotPath(snapshotPath), - protocol.WithBaseDirectory(ts.Directory.PathWithCreate(node3.Name)), - ) - node3.Protocol.SetLogLevel(log.LevelTrace) - ts.Wait() - } - + // node3 := ts.AddNode("node3") + // { + // node3.Initialize(true, + // protocol.WithSnapshotPath(snapshotPath), + // protocol.WithBaseDirectory(ts.Directory.PathWithCreate(node3.Name)), + // ) + // node3.Protocol.SetLogLevel(log.LevelTrace) + // ts.Wait() + // } ts.MergePartitionsToMain() fmt.Println("\n=========================\nMerged network partitions\n=========================") // Continue issuing on all nodes on top of their chain, respectively. { - ts.IssueBlocksAtSlots("P2:", []iotago.SlotIndex{62}, 1, "P2:61.2", nodesP2, false, false) - ts.IssueBlocksAtSlots("P1:", []iotago.SlotIndex{62}, 1, "P1:61.2", nodesP1, false, false) + ts.IssueBlocksAtSlots("P2:", []iotago.SlotIndex{61}, 1, "P2:61.2", nodesP2, false, false) + ts.IssueBlocksAtSlots("P1:", []iotago.SlotIndex{61}, 1, "P1:61.2", nodesP1, false, false) + ts.Wait() + + fmt.Println(">>>>>>> Checking stuff...") // ts.AssertBlocksInCacheAccepted(ts.BlocksWithPrefix("59.0"), true, ts.Nodes()...) ts.AssertLatestCommitmentSlotIndex(59, ts.Nodes()...) - + fmt.Println(">>>>>>> Latest commitment slot index:", 59) + // pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) ts.AssertEqualStoredCommitmentAtIndex(59, ts.Nodes()...) + fmt.Println(">>>>>>> Stored commitment at index 59 is equal for all nodes.") } }