Skip to content

Commit

Permalink
moar debug logs to finally find lastAccessedEpoch bug and node being …
Browse files Browse the repository at this point in the history
…stuck in "deleting" practically endless (ghost) epochs from disk
  • Loading branch information
jonastheis committed Apr 17, 2024
1 parent 9db69b6 commit 8206baa
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 23 deletions.
22 changes: 22 additions & 0 deletions pkg/protocol/engines.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,29 +81,39 @@ func (e *Engines) initLogging() (shutdown func()) {

// ForkAtSlot creates a new engine instance that forks from the main engine at the given slot.
func (e *Engines) ForkAtSlot(slot iotago.SlotIndex) (*engine.Engine, error) {
e.LogInfo("forking engine at slot", "slot", slot)

newEngineAlias := lo.PanicOnErr(uuid.NewUUID()).String()
errorHandler := func(err error) {
e.protocol.LogError("engine error", "err", err, "name", newEngineAlias[0:8])
}

e.LogInfo("forking engine at slot 2", "slot", slot)

// copy raw data on disk.
newStorage, err := storage.Clone(e, e.Main.Get().Storage, e.directory.Path(newEngineAlias), DatabaseVersion, errorHandler, e.protocol.Options.StorageOptions...)
if err != nil {
return nil, ierrors.Wrapf(err, "failed to copy storage from active engine instance (%s) to new engine instance (%s)", e.Main.Get().Storage.Directory(), e.directory.Path(newEngineAlias))
}

e.LogInfo("forking engine at slot 3", "slot", slot)

// remove commitments that after forking point.
latestCommitment := newStorage.Settings().LatestCommitment()
if err = newStorage.Commitments().Rollback(slot, latestCommitment.Slot()); err != nil {
return nil, ierrors.Wrap(err, "failed to rollback commitments")
}

e.LogInfo("forking engine at slot 4", "slot", slot)

// some components are automatically rolled back by deleting their data on disk (e.g. slot based storage).
// some other components need to be rolled back manually, like the UTXO ledger for example.
// we need to create temporary components to rollback their permanent state, which will be reflected on disk.
evictionState := eviction.NewState(newStorage.Settings(), newStorage.RootBlocks)
evictionState.Initialize(latestCommitment.Slot())

e.LogInfo("forking engine at slot 5", "slot", slot)

blockCache := blocks.New(evictionState, newStorage.Settings().APIProvider())
accountsManager := accountsledger.New(e.protocol.NewSubModule("ForkedAccountsLedger"), newStorage.Settings().APIProvider(), blockCache.Block, newStorage.AccountDiffs, newStorage.Accounts())

Expand All @@ -119,26 +129,36 @@ func (e *Engines) ForkAtSlot(slot iotago.SlotIndex) (*engine.Engine, error) {
return nil, err
}

e.LogInfo("forking engine at slot 6", "slot", slot)

targetCommitment, err := newStorage.Commitments().Load(slot)
if err != nil {
return nil, ierrors.Wrapf(err, "error while retrieving commitment for target index %d", slot)
}

e.LogInfo("forking engine at slot 6.1", "slot", slot)

if err = newStorage.Settings().Rollback(targetCommitment); err != nil {
return nil, err
}

e.LogInfo("forking engine at slot 6.2", "slot", slot)
if err = newStorage.Rollback(slot); err != nil {
e.LogError("failed to rollback storage", "err", err)
return nil, err
}

e.LogInfo("forking engine at slot 7", "slot", slot)

candidateEngine := e.loadEngineInstanceWithStorage(newEngineAlias, newStorage)

// rollback attestations already on created engine instance, because this action modifies the in-memory storage.
if err = candidateEngine.Attestations.Rollback(slot); err != nil {
return nil, ierrors.Wrap(err, "error while rolling back attestations storage on candidate engine")
}

e.LogInfo("forking engine at slot 8", "slot", slot)

return candidateEngine, nil
}

Expand Down Expand Up @@ -274,7 +294,9 @@ func (e *Engines) injectEngineInstances() (shutdown func()) {
} else {
e.protocol.Network.OnShutdown(func() { newEngine.ShutdownEvent().Trigger() })

e.LogInfo("injecting engine instance before", "chain", chain.LogName())
chain.Engine.Set(newEngine)
e.LogInfo("injecting engine instance after", "chain", chain.LogName())
}
})
})
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/prunable/epochstore/epoch_kv.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package epochstore

import (
"fmt"

"github.com/iotaledger/hive.go/ierrors"
"github.com/iotaledger/hive.go/kvstore"
"github.com/iotaledger/hive.go/lo"
Expand Down Expand Up @@ -108,10 +110,12 @@ func (e *EpochKVStore) Prune(epoch iotago.EpochIndex, defaultPruningDelay iotago

func (e *EpochKVStore) RollbackEpochs(epoch iotago.EpochIndex) (lastPrunedEpoch iotago.EpochIndex, err error) {
lastAccessedEpoch, err := e.LastAccessedEpoch()
fmt.Println("lastAccessedEpoch: ", lastAccessedEpoch)
if err != nil {
return lastAccessedEpoch, ierrors.Wrap(err, "failed to get last accessed epoch")
}

fmt.Println("epoch: ", epoch)
for epochToPrune := epoch; epochToPrune <= lastAccessedEpoch; epochToPrune++ {
if err = e.DeleteEpoch(epochToPrune); err != nil {
return epochToPrune, ierrors.Wrapf(err, "error while deleting epoch %d", epochToPrune)
Expand Down
16 changes: 16 additions & 0 deletions pkg/storage/prunable/prunable.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package prunable

import (
"fmt"

copydir "github.com/otiai10/copy"

"github.com/iotaledger/hive.go/ierrors"
Expand Down Expand Up @@ -171,34 +173,47 @@ func (p *Prunable) Flush() {
}

func (p *Prunable) Rollback(targetEpoch iotago.EpochIndex, startPruneRange iotago.SlotIndex, endPruneRange iotago.SlotIndex) error {
fmt.Println("Rollback", targetEpoch, startPruneRange, endPruneRange)

if err := p.prunableSlotStore.PruneSlots(targetEpoch, startPruneRange, endPruneRange); err != nil {
return ierrors.Wrapf(err, "failed to prune slots in range [%d, %d] from target epoch %d", startPruneRange, endPruneRange, targetEpoch)
}

fmt.Println("Rollback 2", targetEpoch, startPruneRange, endPruneRange)

if err := p.rollbackCommitteesCandidates(targetEpoch, startPruneRange); err != nil {
return ierrors.Wrapf(err, "failed to rollback committee candidates to target epoch %d", targetEpoch)
}
fmt.Println("Rollback 3", targetEpoch, startPruneRange, endPruneRange)

lastPrunedCommitteeEpoch, err := p.rollbackCommitteeEpochs(targetEpoch+1, startPruneRange-1)
if err != nil {
return ierrors.Wrapf(err, "failed to rollback committee epochs to target epoch %d", targetEpoch)
}

fmt.Println("Rollback 4", targetEpoch, startPruneRange, endPruneRange)

lastPrunedPoolStatsEpoch, _, err := p.poolStats.RollbackEpochs(targetEpoch)
if err != nil {
return ierrors.Wrapf(err, "failed to rollback pool stats epochs to target epoch %d", targetEpoch)
}

fmt.Println("Rollback 5", targetEpoch, startPruneRange, endPruneRange)

lastPrunedDecidedUpgradeSignalsEpoch, _, err := p.decidedUpgradeSignals.RollbackEpochs(targetEpoch)
if err != nil {
return ierrors.Wrapf(err, "failed to rollback decided upgrade signals epochs to target epoch %d", targetEpoch)
}

fmt.Println("Rollback 6", targetEpoch, startPruneRange, endPruneRange)

lastPrunedPoolRewardsEpoch, err := p.poolRewards.RollbackEpochs(targetEpoch)
if err != nil {
return ierrors.Wrapf(err, "failed to rollback pool rewards epochs to target epoch %d", targetEpoch)
}

fmt.Println("Rollback 7 ", targetEpoch, startPruneRange, endPruneRange)

for epochToPrune := targetEpoch + 1; epochToPrune <= max(
lastPrunedCommitteeEpoch,
lastPrunedPoolStatsEpoch,
Expand All @@ -207,6 +222,7 @@ func (p *Prunable) Rollback(targetEpoch iotago.EpochIndex, startPruneRange iotag
); epochToPrune++ {
p.prunableSlotStore.DeleteBucket(epochToPrune)
}
fmt.Println("Rollback 8", targetEpoch, startPruneRange, endPruneRange)

return nil
}
Expand Down
59 changes: 36 additions & 23 deletions pkg/tests/loss_of_acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package tests

import (
"fmt"
"net/http"
_ "net/http"
_ "net/http/pprof"
"testing"
"time"

Expand All @@ -18,6 +21,12 @@ import (
)

func TestLossOfAcceptanceFromGenesis(t *testing.T) {
// debug.SetEnabled(true)

go func() {
fmt.Println(http.ListenAndServe("localhost:6061", nil))
}()

ts := testsuite.NewTestSuite(t,
testsuite.WithProtocolParametersOptions(
iotago.WithTimeProviderOptions(
Expand All @@ -36,21 +45,21 @@ func TestLossOfAcceptanceFromGenesis(t *testing.T) {
),
testsuite.WithWaitFor(15*time.Second),
)
defer ts.Shutdown()
// defer ts.Shutdown()

node0 := ts.AddValidatorNode("node0")
ts.AddDefaultWallet(node0)
node1 := ts.AddValidatorNode("node1")
node2 := ts.AddNode("node2")
// node2 := ts.AddNode("node2")

nodesP1 := []*mock.Node{node0, node2}
nodesP1 := []*mock.Node{node0}
nodesP2 := []*mock.Node{node1}

ts.Run(true, nil)

node0.Protocol.SetLogLevel(log.LevelFatal)
node1.Protocol.SetLogLevel(log.LevelFatal)
node2.Protocol.SetLogLevel(log.LevelFatal)
node0.Protocol.SetLogLevel(log.LevelTrace)
// node1.Protocol.SetLogLevel(log.LevelTrace)
// node2.Protocol.SetLogLevel(log.LevelFatal)

// Create snapshot to use later.
snapshotPath := ts.Directory.Path(fmt.Sprintf("%d_snapshot", time.Now().Unix()))
Expand Down Expand Up @@ -101,8 +110,8 @@ func TestLossOfAcceptanceFromGenesis(t *testing.T) {
ts.AssertBlocksExist(ts.BlocksWithPrefix("P1"), true, ts.ClientsForNodes(nodesP1...)...)
ts.AssertBlocksExist(ts.BlocksWithPrefix("P1"), false, ts.ClientsForNodes(nodesP2...)...)
}
ts.AssertSybilProtectionOnlineCommittee(seatIndexes[0:1], node0, node2)
ts.AssertSybilProtectionOnlineCommittee(seatIndexes[0:1], node1)
ts.AssertSybilProtectionOnlineCommittee(seatIndexes[0:1], nodesP1...)
ts.AssertSybilProtectionOnlineCommittee(seatIndexes[0:1], nodesP2...)

// Issue in P2
{
Expand All @@ -115,32 +124,36 @@ func TestLossOfAcceptanceFromGenesis(t *testing.T) {
ts.AssertBlocksExist(ts.BlocksWithPrefix("P2"), false, ts.ClientsForNodes(nodesP1...)...)
ts.AssertBlocksExist(ts.BlocksWithPrefix("P2"), true, ts.ClientsForNodes(nodesP2...)...)
}
ts.AssertSybilProtectionOnlineCommittee(seatIndexes[0:1], node0, node2)
ts.AssertSybilProtectionOnlineCommittee(seatIndexes[1:2], node1)
ts.AssertSybilProtectionOnlineCommittee(seatIndexes[0:1], nodesP1...)
ts.AssertSybilProtectionOnlineCommittee(seatIndexes[1:2], nodesP2...)

// Start node3 from genesis snapshot.
node3 := ts.AddNode("node3")
{
node3.Initialize(true,
protocol.WithSnapshotPath(snapshotPath),
protocol.WithBaseDirectory(ts.Directory.PathWithCreate(node3.Name)),
)
node3.Protocol.SetLogLevel(log.LevelTrace)
ts.Wait()
}

// node3 := ts.AddNode("node3")
// {
// node3.Initialize(true,
// protocol.WithSnapshotPath(snapshotPath),
// protocol.WithBaseDirectory(ts.Directory.PathWithCreate(node3.Name)),
// )
// node3.Protocol.SetLogLevel(log.LevelTrace)
// ts.Wait()
// }
ts.MergePartitionsToMain()
fmt.Println("\n=========================\nMerged network partitions\n=========================")

// Continue issuing on all nodes on top of their chain, respectively.
{
ts.IssueBlocksAtSlots("P2:", []iotago.SlotIndex{62}, 1, "P2:61.2", nodesP2, false, false)
ts.IssueBlocksAtSlots("P1:", []iotago.SlotIndex{62}, 1, "P1:61.2", nodesP1, false, false)
ts.IssueBlocksAtSlots("P2:", []iotago.SlotIndex{61}, 1, "P2:61.2", nodesP2, false, false)
ts.IssueBlocksAtSlots("P1:", []iotago.SlotIndex{61}, 1, "P1:61.2", nodesP1, false, false)

ts.Wait()

fmt.Println(">>>>>>> Checking stuff...")
// ts.AssertBlocksInCacheAccepted(ts.BlocksWithPrefix("59.0"), true, ts.Nodes()...)
ts.AssertLatestCommitmentSlotIndex(59, ts.Nodes()...)

fmt.Println(">>>>>>> Latest commitment slot index:", 59)
// pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
ts.AssertEqualStoredCommitmentAtIndex(59, ts.Nodes()...)
fmt.Println(">>>>>>> Stored commitment at index 59 is equal for all nodes.")
}
}

Expand Down

0 comments on commit 8206baa

Please sign in to comment.