diff --git a/pkg/protocol/attestations.go b/pkg/protocol/attestations.go index 3904f7eba..719ef7c17 100644 --- a/pkg/protocol/attestations.go +++ b/pkg/protocol/attestations.go @@ -78,8 +78,11 @@ func (a *Attestations) initRequester() (shutdown func()) { unsubscribeFromTicker := lo.Batch( a.protocol.Commitments.WithElements(func(commitment *Commitment) (shutdown func()) { return commitment.RequestAttestations.WithNonEmptyValue(func(_ bool) (teardown func()) { - if commitment.CumulativeWeight() == 0 { - commitment.IsAttested.Set(true) + if commitment.CumulativeWeight.Get() == 0 { + // execute in worker pool since it can have long-running effects (e.g. chain switching) + a.workerPool.Submit(func() { + commitment.IsAttested.Set(true) + }) return nil } @@ -161,7 +164,10 @@ func (a *Attestations) processResponse(commitment *model.Commitment, attestation return } - if publishedCommitment.AttestedWeight.Compute(func(currentWeight uint64) uint64 { + var wasAttested, attestationsUpdated bool + publishedCommitment.AttestedWeight.Compute(func(currentWeight uint64) uint64 { + wasAttested = currentWeight > 0 + if !publishedCommitment.RequestAttestations.Get() { a.LogTrace("received attestations for previously attested commitment", "commitment", publishedCommitment.LogName()) @@ -189,12 +195,14 @@ func (a *Attestations) processResponse(commitment *model.Commitment, attestation return currentWeight } - if actualWeight > currentWeight { - a.LogDebug("received response", "commitment", publishedCommitment.LogName(), "fromPeer", from) + if attestationsUpdated = actualWeight > currentWeight; attestationsUpdated { + a.LogDebug("received response", "commitment", publishedCommitment.LogName(), "weight", actualWeight, "fromPeer", from) } return actualWeight - }) > 0 { + }) + + if !wasAttested && attestationsUpdated { publishedCommitment.IsAttested.Set(true) } }) diff --git a/pkg/protocol/chain.go b/pkg/protocol/chain.go index e70f12be1..366f138a1 100644 --- a/pkg/protocol/chain.go +++ b/pkg/protocol/chain.go @@ -33,18 +33,6 @@ type Chain struct { // corresponding blocks in the Engine. LatestProducedCommitment reactive.Variable[*Commitment] - // ClaimedWeight contains the claimed weight of this chain which is derived from the cumulative weight of the - // LatestCommitment. - ClaimedWeight reactive.Variable[uint64] - - // AttestedWeight contains the attested weight of this chain which is derived from the cumulative weight of all - // attestations up to the LatestAttestedCommitment. - AttestedWeight reactive.Variable[uint64] - - // VerifiedWeight contains the verified weight of this chain which is derived from the cumulative weight of the - // latest verified commitment. - VerifiedWeight reactive.Variable[uint64] - // WarpSyncMode contains a flag that indicates whether this chain is in warp sync mode. WarpSyncMode reactive.Variable[bool] @@ -88,9 +76,6 @@ func newChain(chains *Chains) *Chain { LatestCommitment: reactive.NewVariable[*Commitment](), LatestAttestedCommitment: reactive.NewVariable[*Commitment](), LatestProducedCommitment: reactive.NewVariable[*Commitment](), - ClaimedWeight: reactive.NewVariable[uint64](), - AttestedWeight: reactive.NewVariable[uint64](), - VerifiedWeight: reactive.NewVariable[uint64](), WarpSyncMode: reactive.NewVariable[bool]().Init(true), LatestSyncedSlot: reactive.NewVariable[iotago.SlotIndex](), OutOfSyncThreshold: reactive.NewVariable[iotago.SlotIndex](), @@ -189,9 +174,6 @@ func (c *Chain) initLogger() (shutdown func()) { c.LatestSyncedSlot.LogUpdates(c, log.LevelTrace, "LatestSyncedSlot"), c.OutOfSyncThreshold.LogUpdates(c, log.LevelTrace, "OutOfSyncThreshold"), c.ForkingPoint.LogUpdates(c, log.LevelTrace, "ForkingPoint", (*Commitment).LogName), - c.ClaimedWeight.LogUpdates(c, log.LevelTrace, "ClaimedWeight"), - c.AttestedWeight.LogUpdates(c, log.LevelTrace, "AttestedWeight"), - c.VerifiedWeight.LogUpdates(c, log.LevelTrace, "VerifiedWeight"), c.LatestCommitment.LogUpdates(c, log.LevelTrace, "LatestCommitment", (*Commitment).LogName), c.LatestAttestedCommitment.LogUpdates(c, log.LevelTrace, "LatestAttestedCommitment", (*Commitment).LogName), c.LatestProducedCommitment.LogUpdates(c, log.LevelDebug, "LatestProducedCommitment", (*Commitment).LogName), @@ -207,9 +189,6 @@ func (c *Chain) initLogger() (shutdown func()) { // initDerivedProperties initializes the behavior of this chain by setting up the relations between its properties. func (c *Chain) initDerivedProperties() (shutdown func()) { return lo.Batch( - c.deriveClaimedWeight(), - c.deriveVerifiedWeight(), - c.deriveLatestAttestedWeight(), c.deriveWarpSyncMode(), c.ForkingPoint.WithValue(c.deriveParentChain), @@ -231,39 +210,6 @@ func (c *Chain) deriveWarpSyncMode() func() { }, c.LatestSyncedSlot, c.chains.LatestSeenSlot, c.OutOfSyncThreshold, c.WarpSyncMode.Get())) } -// deriveClaimedWeight defines how a chain determines its claimed weight (by setting the cumulative weight of the -// latest commitment). -func (c *Chain) deriveClaimedWeight() (shutdown func()) { - return c.ClaimedWeight.DeriveValueFrom(reactive.NewDerivedVariable(func(_ uint64, latestCommitment *Commitment) uint64 { - if latestCommitment == nil { - return 0 - } - - return latestCommitment.CumulativeWeight() - }, c.LatestCommitment)) -} - -// deriveLatestAttestedWeight defines how a chain determines its attested weight (by inheriting the cumulative attested -// weight of the latest attested commitment). It uses inheritance instead of simply setting the value as the cumulative -// attested weight can change over time depending on the attestations that are received. -func (c *Chain) deriveLatestAttestedWeight() func() { - return c.LatestAttestedCommitment.WithNonEmptyValue(func(latestAttestedCommitment *Commitment) (shutdown func()) { - return c.AttestedWeight.InheritFrom(latestAttestedCommitment.CumulativeAttestedWeight) - }) -} - -// deriveVerifiedWeight defines how a chain determines its verified weight (by setting the cumulative weight of the -// latest produced commitment). -func (c *Chain) deriveVerifiedWeight() func() { - return c.VerifiedWeight.DeriveValueFrom(reactive.NewDerivedVariable(func(_ uint64, latestProducedCommitment *Commitment) uint64 { - if latestProducedCommitment == nil { - return 0 - } - - return latestProducedCommitment.CumulativeWeight() - }, c.LatestProducedCommitment)) -} - // deriveChildChains defines how a chain determines its ChildChains (by adding each child to the set). func (c *Chain) deriveChildChains(child *Chain) func() { c.ChildChains.Add(child) @@ -353,21 +299,3 @@ func (c *Chain) dispatchBlockToSpawnedEngine(block *model.Block, src peer.ID) (d return true } - -// claimedWeight is a getter for the ClaimedWeight variable of this chain, which is internally used to be able to -// "address" the variable across multiple chains in a generic way. -func (c *Chain) claimedWeight() reactive.Variable[uint64] { - return c.ClaimedWeight -} - -// verifiedWeight is a getter for the VerifiedWeight variable of this chain, which is internally used to be able to -// "address" the variable across multiple chains in a generic way. -func (c *Chain) verifiedWeight() reactive.Variable[uint64] { - return c.VerifiedWeight -} - -// attestedWeight is a getter for the AttestedWeight variable of this chain, which is internally used to be able to -// "address" the variable across multiple chains in a generic way. -func (c *Chain) attestedWeight() reactive.Variable[uint64] { - return c.AttestedWeight -} diff --git a/pkg/protocol/chains.go b/pkg/protocol/chains.go index cff02abd1..fe2f2d61c 100644 --- a/pkg/protocol/chains.go +++ b/pkg/protocol/chains.go @@ -6,6 +6,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/iotaledger/hive.go/ds/reactive" + "github.com/iotaledger/hive.go/ds/shrinkingmap" "github.com/iotaledger/hive.go/lo" "github.com/iotaledger/hive.go/log" "github.com/iotaledger/iota-core/pkg/model" @@ -13,6 +14,8 @@ import ( iotago "github.com/iotaledger/iota.go/v4" ) +// region Chains /////////////////////////////////////////////////////////////////////////////////////////////////////// + // Chains is a subcomponent of the protocol that exposes the chains that are managed by the protocol and that implements // the chain switching logic. type Chains struct { @@ -23,13 +26,13 @@ type Chains struct { Main reactive.Variable[*Chain] // HeaviestClaimedCandidate contains the candidate chain with the heaviest claimed weight according to its latest commitment. The weight has neither been checked via attestations nor verified by downloading all data. - HeaviestClaimedCandidate reactive.Variable[*Chain] + HeaviestClaimedCandidate *ChainsCandidate // HeaviestAttestedCandidate contains the candidate chain with the heaviest weight as checked by attestations. The chain has not been instantiated into an engine yet. - HeaviestAttestedCandidate reactive.Variable[*Chain] + HeaviestAttestedCandidate *ChainsCandidate // HeaviestVerifiedCandidate contains the candidate chain with the heaviest verified weight, meaning the chain has been instantiated into an engine and the commitments have been produced by the engine itself. - HeaviestVerifiedCandidate reactive.Variable[*Chain] + HeaviestVerifiedCandidate *ChainsCandidate // LatestSeenSlot contains the slot of the latest commitment of any received block. LatestSeenSlot reactive.Variable[iotago.SlotIndex] @@ -44,15 +47,16 @@ type Chains struct { // newChains creates a new chains instance for the given protocol. func newChains(protocol *Protocol) *Chains { c := &Chains{ - Set: reactive.NewSet[*Chain](), - Main: reactive.NewVariable[*Chain](), - HeaviestClaimedCandidate: reactive.NewVariable[*Chain](), - HeaviestAttestedCandidate: reactive.NewVariable[*Chain](), - HeaviestVerifiedCandidate: reactive.NewVariable[*Chain](), - LatestSeenSlot: reactive.NewVariable[iotago.SlotIndex](increasing[iotago.SlotIndex]), - protocol: protocol, + Set: reactive.NewSet[*Chain](), + Main: reactive.NewVariable[*Chain](), + LatestSeenSlot: reactive.NewVariable[iotago.SlotIndex](increasing[iotago.SlotIndex]), + protocol: protocol, } + c.HeaviestClaimedCandidate = newChainsCandidate(c, (*Commitment).cumulativeWeight) + c.HeaviestAttestedCandidate = newChainsCandidate(c, (*Commitment).cumulativeAttestedWeight) + c.HeaviestVerifiedCandidate = newChainsCandidate(c, (*Commitment).cumulativeVerifiedWeight) + shutdown := lo.Batch( c.initLogger(protocol.NewChildLogger("Chains")), c.initChainSwitching(), @@ -98,68 +102,58 @@ func (c *Chains) initChainSwitching() (shutdown func()) { c.Main.Set(mainChain) - // only switch to the heavier chain if the latest commitment is enough slots away from the forking point. - forkingPointBelowChainSwitchingThreshold := func(chain *Chain) func(_ *Commitment, latestCommitment *Commitment) bool { - return func(_ *Commitment, latestCommitment *Commitment) bool { - forkingPoint := chain.ForkingPoint.Get() - chainSwitchingThreshold := iotago.SlotIndex(c.protocol.APIForSlot(latestCommitment.Slot()).ProtocolParameters().ChainSwitchingThreshold()) - - return forkingPoint != nil && latestCommitment != nil && (latestCommitment.ID().Slot()-forkingPoint.ID().Slot()) > chainSwitchingThreshold - } - } - return lo.Batch( c.HeaviestClaimedCandidate.WithNonEmptyValue(func(heaviestClaimedCandidate *Chain) (shutdown func()) { return heaviestClaimedCandidate.RequestAttestations.ToggleValue(true) }), - c.HeaviestAttestedCandidate.WithNonEmptyValue(func(heaviestAttestedCandidate *Chain) (shutdown func()) { - return heaviestAttestedCandidate.LatestAttestedCommitment.OnUpdateOnce(func(_ *Commitment, _ *Commitment) { + c.HeaviestAttestedCandidate.OnUpdate(func(_ *Chain, heaviestAttestedCandidate *Chain) { + if heaviestAttestedCandidate != nil { heaviestAttestedCandidate.StartEngine.Set(true) - }, forkingPointBelowChainSwitchingThreshold(heaviestAttestedCandidate)) + } }), - c.HeaviestVerifiedCandidate.WithNonEmptyValue(func(heaviestVerifiedCandidate *Chain) (shutdown func()) { - return heaviestVerifiedCandidate.LatestProducedCommitment.OnUpdateOnce(func(_ *Commitment, latestProducedCommitment *Commitment) { + c.HeaviestVerifiedCandidate.OnUpdate(func(_ *Chain, heaviestVerifiedCandidate *Chain) { + if heaviestVerifiedCandidate != nil { c.Main.Set(heaviestVerifiedCandidate) - }, forkingPointBelowChainSwitchingThreshold(heaviestVerifiedCandidate)) + } }), - c.WithElements(func(candidateChain *Chain) (shutdown func()) { - return lo.Batch( - c.initHeaviestCandidateTracking(c.HeaviestClaimedCandidate, (*Chain).claimedWeight, candidateChain), - c.initHeaviestCandidateTracking(c.HeaviestVerifiedCandidate, (*Chain).verifiedWeight, candidateChain), - c.initHeaviestCandidateTracking(c.HeaviestAttestedCandidate, (*Chain).attestedWeight, candidateChain), - ) - }), + c.WithElements(c.trackHeaviestCandidates), + c.LatestSeenSlot.WithNonEmptyValue(c.updateMeasuredSlot), ) } -// initHeaviestCandidateTracking initializes the tracking of the heaviest candidates according to the given parameters. -func (c *Chains) initHeaviestCandidateTracking(candidateVar reactive.Variable[*Chain], weightVar func(*Chain) reactive.Variable[uint64], newCandidate *Chain) (unsubscribe func()) { - return weightVar(newCandidate).OnUpdate(func(_ uint64, newWeight uint64) { - // abort if the candidate is not heavier than the main chain. - if mainChain := c.Main.Get(); newCandidate == mainChain || newWeight <= mainChain.VerifiedWeight.Get() { - return +func (c *Chains) trackHeaviestCandidates(chain *Chain) (teardown func()) { + return chain.LatestCommitment.OnUpdate(func(_ *Commitment, latestCommitment *Commitment) { + targetSlot := latestCommitment.ID().Index() + + if evictionEvent := c.protocol.EvictionEvent(targetSlot); !evictionEvent.WasTriggered() { + c.HeaviestClaimedCandidate.registerCommitment(targetSlot, latestCommitment, evictionEvent) + c.HeaviestAttestedCandidate.registerCommitment(targetSlot, latestCommitment, evictionEvent) + c.HeaviestVerifiedCandidate.registerCommitment(targetSlot, latestCommitment, evictionEvent) } + }) +} - // atomically replace the existing candidate if the new one is heavier. - candidateVar.Compute(func(currentCandidate *Chain) *Chain { - if currentCandidate != nil && !currentCandidate.IsEvicted.WasTriggered() && newWeight <= weightVar(currentCandidate).Get() { - return currentCandidate - } +func (c *Chains) updateMeasuredSlot(latestSeenSlot iotago.SlotIndex) (teardown func()) { + measuredSlot := latestSeenSlot - chainSwitchingMeasurementOffset - return newCandidate - }) - }, true) + return lo.Batch( + c.HeaviestClaimedCandidate.measureAt(measuredSlot), + c.HeaviestAttestedCandidate.measureAt(measuredSlot), + c.HeaviestVerifiedCandidate.measureAt(measuredSlot), + ) } // deriveLatestSeenSlot derives the latest seen slot from the protocol. func (c *Chains) deriveLatestSeenSlot(protocol *Protocol) func() { return protocol.Engines.Main.WithNonEmptyValue(func(mainEngine *engine.Engine) (shutdown func()) { return lo.Batch( - mainEngine.Initialized.OnTrigger(func() { - c.LatestSeenSlot.Set(mainEngine.LatestCommitment.Get().Slot()) + c.WithInitializedEngines(func(_ *Chain, engine *engine.Engine) (shutdown func()) { + return engine.LatestCommitment.OnUpdate(func(_ *model.Commitment, latestCommitment *model.Commitment) { + c.LatestSeenSlot.Set(latestCommitment.Slot()) + }) }), protocol.Network.OnBlockReceived(func(block *model.Block, src peer.ID) { @@ -183,3 +177,101 @@ func (c *Chains) newChain() *Chain { func increasing[T cmp.Ordered](currentValue T, newValue T) T { return max(currentValue, newValue) } + +const chainSwitchingMeasurementOffset iotago.SlotIndex = 1 + +// endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// + +// region ChainsCandidate ////////////////////////////////////////////////////////////////////////////////////////////// + +// ChainsCandidate implements a wrapper for the logic of tracking the heaviest candidate of all Chains in respect to +// some monitored weight variable. +type ChainsCandidate struct { + // Variable contains the heaviest chain candidate. + reactive.Variable[*Chain] + + // chains contains a reference to the Chains instance that this candidate belongs to. + chains *Chains + + // weightVariable contains the weight variable that is used to determine the heaviest chain candidate. + weightVariable func(element *Commitment) reactive.Variable[uint64] + + // sortedCommitmentsBySlot contains the sorted commitments for each slot. + sortedCommitmentsBySlot *shrinkingmap.ShrinkingMap[iotago.SlotIndex, reactive.SortedSet[*Commitment]] +} + +// newChainsCandidate creates a new heaviest chain candidate. +func newChainsCandidate(chains *Chains, weightVariable func(element *Commitment) reactive.Variable[uint64]) *ChainsCandidate { + return &ChainsCandidate{ + Variable: reactive.NewVariable[*Chain](), + chains: chains, + sortedCommitmentsBySlot: shrinkingmap.New[iotago.SlotIndex, reactive.SortedSet[*Commitment]](), + weightVariable: weightVariable, + } +} + +// measureAt measures the heaviest chain candidate at the given slot and updates the variable as soon as the threshold +// of chainSwitchingThreshold slots with the same heaviest chain in respect to the given slot is reached. +func (c *ChainsCandidate) measureAt(slot iotago.SlotIndex) (teardown func()) { + // sanitize protocol parameters + chainSwitchingThreshold := c.chains.protocol.APIForSlot(slot).ProtocolParameters().ChainSwitchingThreshold() + if slot < iotago.SlotIndex(chainSwitchingThreshold) { + return + } + + // abort if no commitment exists for this slot + sortedCommitments, sortedCommitmentsExist := c.sortedCommitmentsBySlot.Get(slot) + if !sortedCommitmentsExist { + return + } + + // make sure the heaviest commitment was the heaviest for the last chainSwitchingThreshold slots before we update + return sortedCommitments.HeaviestElement().WithNonEmptyValue(func(heaviestCommitment *Commitment) (teardown func()) { + // abort if the heaviest commitment is the main chain + heaviestChain := heaviestCommitment.Chain.Get() + if heaviestChain == c.chains.Main.Get() { + return + } + + // create counter for the number of slots with the same chain + slotsWithSameChain := reactive.NewCounter[*Commitment](func(commitment *Commitment) bool { + return commitment.Chain.Get() == heaviestChain + }) + + // reactively counts the number of slots with the same chain + var teardownMonitoringFunctions []func() + for i := uint8(1); i < chainSwitchingThreshold; i++ { + if earlierCommitments, earlierCommitmentsExist := c.sortedCommitmentsBySlot.Get(slot - iotago.SlotIndex(i)); earlierCommitmentsExist { + teardownMonitoringFunctions = append(teardownMonitoringFunctions, slotsWithSameChain.Monitor(earlierCommitments.HeaviestElement())) + } + } + + // reactively update the value in respect to the reached threshold + teardownUpdates := slotsWithSameChain.OnUpdate(func(_ int, slotsWithSameChain int) { + if slotsWithSameChain >= int(chainSwitchingThreshold)-1 { + c.Set(heaviestChain) + } else { + c.Set(nil) + } + }) + + // return all teardown functions + return lo.Batch(append(teardownMonitoringFunctions, teardownUpdates)...) + }) +} + +// registerCommitment registers the given commitment for the given slot, which makes it become part of the weight +// measurement process. +func (c *ChainsCandidate) registerCommitment(slot iotago.SlotIndex, commitment *Commitment, evictionEvent reactive.Event) { + sortedCommitments, slotCreated := c.sortedCommitmentsBySlot.GetOrCreate(slot, func() reactive.SortedSet[*Commitment] { + return reactive.NewSortedSet(c.weightVariable) + }) + + if slotCreated { + evictionEvent.OnTrigger(func() { c.sortedCommitmentsBySlot.Delete(slot) }) + } + + sortedCommitments.Add(commitment) +} + +// endregion /////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/pkg/protocol/commitment.go b/pkg/protocol/commitment.go index c9df921fa..d6455ef92 100644 --- a/pkg/protocol/commitment.go +++ b/pkg/protocol/commitment.go @@ -1,6 +1,7 @@ package protocol import ( + "bytes" "fmt" "github.com/iotaledger/hive.go/ds" @@ -47,9 +48,15 @@ type Commitment struct { // AttestedWeight contains the weight of the Commitment that was attested by other nodes. AttestedWeight reactive.Variable[uint64] + // CumulativeWeight contains the cumulative weight of all Commitments up to this point. + CumulativeWeight reactive.Variable[uint64] + // CumulativeAttestedWeight contains the cumulative weight of all attested Commitments up to this point. CumulativeAttestedWeight reactive.Variable[uint64] + // CumulativeVerifiedWeight contains the cumulative weight of all verified Commitments up to this point. + CumulativeVerifiedWeight reactive.Variable[uint64] + // IsRoot contains a flag indicating if this Commitment is the root of the Chain. IsRoot reactive.Event @@ -97,7 +104,9 @@ func newCommitment(commitments *Commitments, model *model.Commitment) *Commitmen BlocksToWarpSync: reactive.NewVariable[ds.Set[iotago.BlockID]](), Weight: reactive.NewVariable[uint64](), AttestedWeight: reactive.NewVariable[uint64](func(currentValue uint64, newValue uint64) uint64 { return max(currentValue, newValue) }), + CumulativeWeight: reactive.NewVariable[uint64](), CumulativeAttestedWeight: reactive.NewVariable[uint64](), + CumulativeVerifiedWeight: reactive.NewVariable[uint64](), IsRoot: reactive.NewEvent(), IsAttested: reactive.NewEvent(), IsSynced: reactive.NewEvent(), @@ -128,6 +137,71 @@ func (c *Commitment) TargetEngine() *engine.Engine { return nil } +// Less is a function that is used to break ties between two Commitments that have the same cumulative weight by using +// the ID of their divergence points (the first commitment that is different between their chains). +func (c *Commitment) Less(other *Commitment) bool { + // trivial case where both commitments are the same or one of them is nil + if c == other { + return false + } else if c == nil { + return true + } else if other == nil { + return false + } + + // trivial case where both commitments have the same chain + largerChain, smallerChain := other.Chain.Get(), c.Chain.Get() + if largerChain == smallerChain { + return false + } + + // iterate until we find the divergence point of both chains + for { + // trivial case where one of the chains is nil + if largerChain == nil { + return false + } else if smallerChain == nil { + return true + } + + // trivial case where one of the chains has no forking point, yet + forkingPointOfLargerChain, forkingPointOfSmallerChain := largerChain.ForkingPoint.Get(), smallerChain.ForkingPoint.Get() + if forkingPointOfLargerChain == nil { + return false + } else if forkingPointOfSmallerChain == nil { + return true + } + + // if the forking points of both chains have the same parent, then the forking points are the divergence points + if forkingPointOfLargerChain.Slot() == forkingPointOfSmallerChain.Slot() && forkingPointOfLargerChain.Parent.Get() == forkingPointOfSmallerChain.Parent.Get() { + return bytes.Compare(lo.PanicOnErr(forkingPointOfLargerChain.ID().Bytes()), lo.PanicOnErr(forkingPointOfSmallerChain.ID().Bytes())) > 0 + } + + // iterate by traversing the parent of the chain with the higher forking point first + if forkingPointOfLargerChain.Slot() > forkingPointOfSmallerChain.Slot() { + // iterate to parent + largerChain = largerChain.ParentChain.Get() + + // terminate if we reach a common chain + if largerChain == smallerChain { + divergencePointB, divergencePointBExists := smallerChain.Commitment(forkingPointOfLargerChain.Slot()) + + return !divergencePointBExists || bytes.Compare(lo.PanicOnErr(forkingPointOfLargerChain.ID().Bytes()), lo.PanicOnErr(divergencePointB.ID().Bytes())) > 0 + } + } else { + // iterate to parent + smallerChain = smallerChain.ParentChain.Get() + + // terminate if we reach a common chain + if smallerChain == largerChain { + divergencePointA, divergencePointAExists := largerChain.Commitment(forkingPointOfSmallerChain.Slot()) + + return divergencePointAExists && bytes.Compare(lo.PanicOnErr(divergencePointA.ID().Bytes()), lo.PanicOnErr(forkingPointOfSmallerChain.ID().Bytes())) > 0 + } + } + } +} + // initLogger initializes the Logger of this Commitment. func (c *Commitment) initLogger() (shutdown func()) { c.Logger = c.commitments.NewChildLogger(fmt.Sprintf("Slot%d.", c.Slot()), true) @@ -140,7 +214,9 @@ func (c *Commitment) initLogger() (shutdown func()) { c.WarpSyncBlocks.LogUpdates(c, log.LevelTrace, "WarpSyncBlocks"), c.Weight.LogUpdates(c, log.LevelTrace, "Weight"), c.AttestedWeight.LogUpdates(c, log.LevelTrace, "AttestedWeight"), + c.CumulativeWeight.LogUpdates(c, log.LevelTrace, "CumulativeWeight"), c.CumulativeAttestedWeight.LogUpdates(c, log.LevelTrace, "CumulativeAttestedWeight"), + c.CumulativeVerifiedWeight.LogUpdates(c, log.LevelTrace, "CumulativeVerifiedWeight"), c.IsRoot.LogUpdates(c, log.LevelTrace, "IsRoot"), c.IsAttested.LogUpdates(c, log.LevelTrace, "IsAttested"), c.IsSynced.LogUpdates(c, log.LevelTrace, "IsSynced"), @@ -163,10 +239,12 @@ func (c *Commitment) initDerivedProperties() (shutdown func()) { c.IsAttested.InheritFrom(c.IsVerified), c.IsSynced.InheritFrom(c.IsVerified), + c.deriveCumulativeVerifiedWeight(), + c.Parent.WithNonEmptyValue(func(parent *Commitment) func() { - // the weight can be fixed as a one time operation (it only relies on static information) - if parent.CumulativeWeight() < c.CumulativeWeight() { - c.Weight.Set(c.CumulativeWeight() - parent.CumulativeWeight()) + if parent.Commitment.CumulativeWeight() <= c.Commitment.CumulativeWeight() { // prevent overflow in uint64 + c.Weight.Set(c.Commitment.CumulativeWeight() - parent.Commitment.CumulativeWeight()) + c.CumulativeWeight.Set(c.Commitment.CumulativeWeight()) } return lo.Batch( @@ -258,6 +336,14 @@ func (c *Commitment) deriveCumulativeAttestedWeight(parent *Commitment) func() { }, parent.CumulativeAttestedWeight, c.AttestedWeight)) } +// deriveCumulativeVerifiedWeight derives the CumulativeVerifiedWeight of this Commitment which is the same as the +// CumulativeWeight of the underlying model.Commitment if this Commitment is verified. +func (c *Commitment) deriveCumulativeVerifiedWeight() func() { + return c.IsVerified.OnTrigger(func() { + c.CumulativeVerifiedWeight.Set(c.Commitment.CumulativeWeight()) + }) +} + // deriveIsAboveLatestVerifiedCommitment derives the IsAboveLatestVerifiedCommitment flag of this Commitment which is // true if the parent is already above the latest verified Commitment or if the parent is verified and we are not. func (c *Commitment) deriveIsAboveLatestVerifiedCommitment(parent *Commitment) func() { @@ -299,3 +385,18 @@ func (c *Commitment) forceChain(targetChain *Chain) { } } } + +// cumulativeWeight returns the Variable that contains the cumulative weight of this Commitment. +func (c *Commitment) cumulativeWeight() reactive.Variable[uint64] { + return c.CumulativeWeight +} + +// cumulativeAttestedWeight returns the Variable that contains the cumulative attested weight of this Commitment. +func (c *Commitment) cumulativeAttestedWeight() reactive.Variable[uint64] { + return c.CumulativeAttestedWeight +} + +// cumulativeVerifiedWeight returns the Variable that contains the cumulative verified weight of this Commitment. +func (c *Commitment) cumulativeVerifiedWeight() reactive.Variable[uint64] { + return c.CumulativeVerifiedWeight +} diff --git a/pkg/protocol/commitment_verifier.go b/pkg/protocol/commitment_verifier.go index d7a6ce5fa..ed176efb6 100644 --- a/pkg/protocol/commitment_verifier.go +++ b/pkg/protocol/commitment_verifier.go @@ -128,7 +128,7 @@ func (c *CommitmentVerifier) verifyCommitment(commitment *Commitment, attestatio // than it actually is. Nodes might consider to switch to this chain, even though it is invalid which will be discovered // before the candidate chain/engine is activated (it will never get heavier than the current chain). if seatCount > commitment.Weight.Get() { - return nil, 0, ierrors.Errorf("invalid cumulative weight for commitment %s: expected %d, got %d", commitment.ID(), commitment.CumulativeWeight(), seatCount) + return nil, 0, ierrors.Errorf("calculated weight from attestations (%d) is higher than weight of commitment (%d) for commitment %s", seatCount, commitment.Weight.Get(), commitment.ID()) } return blockIDs, seatCount, nil diff --git a/pkg/protocol/commitments.go b/pkg/protocol/commitments.go index 8e2caeb34..28ccb27a4 100644 --- a/pkg/protocol/commitments.go +++ b/pkg/protocol/commitments.go @@ -327,8 +327,16 @@ func (c *Commitments) processRequest(commitmentID iotago.CommitmentID, from peer // processResponse processes the given commitment response. func (c *Commitments) processResponse(commitment *model.Commitment, from peer.ID) { c.workerPool.Submit(func() { + // make sure the main engine is available to process the response + mainEngine := c.protocol.Engines.Main.Get() + if mainEngine == nil { + c.LogError("main engine unavailable for response", "commitment", commitment.ID(), "fromPeer", from) + + return + } + // verify the commitment's version corresponds to the protocol version for the slot. - if apiForSlot := c.protocol.APIForSlot(commitment.Slot()); apiForSlot.Version() != commitment.Commitment().ProtocolVersion { + if apiForSlot := mainEngine.APIForSlot(commitment.Slot()); apiForSlot.Version() != commitment.Commitment().ProtocolVersion { c.LogDebug("received commitment with invalid protocol version", "commitment", commitment.ID(), "version", commitment.Commitment().ProtocolVersion, "expectedVersion", apiForSlot.Version(), "fromPeer", from) return diff --git a/pkg/protocol/engine/blockdag/blockdag.go b/pkg/protocol/engine/blockdag/blockdag.go index af9e1e8de..75392cb60 100644 --- a/pkg/protocol/engine/blockdag/blockdag.go +++ b/pkg/protocol/engine/blockdag/blockdag.go @@ -1,6 +1,7 @@ package blockdag import ( + "github.com/iotaledger/hive.go/log" "github.com/iotaledger/hive.go/runtime/module" "github.com/iotaledger/iota-core/pkg/model" "github.com/iotaledger/iota-core/pkg/protocol/engine/blocks" @@ -19,5 +20,7 @@ type BlockDAG interface { // Reset resets the component to a clean state as if it was created at the last commitment. Reset() + log.Logger + module.Interface } diff --git a/pkg/protocol/engine/blockdag/inmemoryblockdag/blockdag.go b/pkg/protocol/engine/blockdag/inmemoryblockdag/blockdag.go index cb45076d9..b3330d6d3 100644 --- a/pkg/protocol/engine/blockdag/inmemoryblockdag/blockdag.go +++ b/pkg/protocol/engine/blockdag/inmemoryblockdag/blockdag.go @@ -4,6 +4,7 @@ import ( "sync/atomic" "github.com/iotaledger/hive.go/ierrors" + "github.com/iotaledger/hive.go/log" "github.com/iotaledger/hive.go/runtime/event" "github.com/iotaledger/hive.go/runtime/module" "github.com/iotaledger/hive.go/runtime/options" @@ -37,18 +38,20 @@ type BlockDAG struct { errorHandler func(error) module.Module + + log.Logger } func NewProvider(opts ...options.Option[BlockDAG]) module.Provider[*engine.Engine, blockdag.BlockDAG] { return module.Provide(func(e *engine.Engine) blockdag.BlockDAG { - b := New(e.Workers.CreateGroup("BlockDAG"), int(e.Storage.Settings().APIProvider().CommittedAPI().ProtocolParameters().MaxCommittableAge())*2, e.EvictionState, e.BlockCache, e.ErrorHandler("blockdag"), opts...) + b := New(e.Logger.NewChildLogger("BlockDAG"), e.Workers.CreateGroup("BlockDAG"), int(e.Storage.Settings().APIProvider().CommittedAPI().ProtocolParameters().MaxCommittableAge())*2, e.EvictionState, e.BlockCache, e.ErrorHandler("blockdag"), opts...) e.Constructed.OnTrigger(func() { wp := b.workers.CreatePool("BlockDAG.Attach", workerpool.WithWorkerCount(2)) e.Events.PreSolidFilter.BlockPreAllowed.Hook(func(block *model.Block) { if _, _, err := b.Attach(block); err != nil { - b.errorHandler(ierrors.Wrapf(err, "failed to attach block with %s (issuerID: %s)", block.ID(), block.ProtocolBlock().Header.IssuerID)) + b.LogError("failed to attach block", "blockID", block.ID(), "issuer", block.ProtocolBlock().Header.IssuerID, "err", err) } }, event.WithWorkerPool(wp)) @@ -99,8 +102,9 @@ func (b *BlockDAG) setupBlock(block *blocks.Block) { } // New is the constructor for the BlockDAG and creates a new BlockDAG instance. -func New(workers *workerpool.Group, unsolidCommitmentBufferSize int, evictionState *eviction.State, blockCache *blocks.Blocks, errorHandler func(error), opts ...options.Option[BlockDAG]) (newBlockDAG *BlockDAG) { +func New(logger log.Logger, workers *workerpool.Group, unsolidCommitmentBufferSize int, evictionState *eviction.State, blockCache *blocks.Blocks, errorHandler func(error), opts ...options.Option[BlockDAG]) (newBlockDAG *BlockDAG) { return options.Apply(&BlockDAG{ + Logger: logger, events: blockdag.NewEvents(), evictionState: evictionState, blockCache: blockCache, @@ -115,8 +119,6 @@ var _ blockdag.BlockDAG = new(BlockDAG) // Attach is used to attach new Blocks to the BlockDAG. It is the main function of the BlockDAG that triggers Events. func (b *BlockDAG) Attach(data *model.Block) (block *blocks.Block, wasAttached bool, err error) { if block, wasAttached, err = b.attach(data); wasAttached { - b.events.BlockAttached.Trigger(block) - // We add blocks that commit to a commitment we haven't committed ourselves yet to this limited size buffer and // only let them become solid once we committed said slot ourselves (to the same commitment). // This is necessary in order to make sure that all necessary state is available after a block is solid (specifically @@ -131,6 +133,10 @@ func (b *BlockDAG) Attach(data *model.Block) (block *blocks.Block, wasAttached b return } + b.LogTrace("block attached", "block", block.ID()) + + b.events.BlockAttached.Trigger(block) + b.setupBlock(block) } @@ -179,6 +185,8 @@ func (b *BlockDAG) attach(data *model.Block) (block *blocks.Block, wasAttached b } if updated { + b.LogTrace("missing block attached", "block", block.ID()) + b.events.MissingBlockAttached.Trigger(block) } diff --git a/pkg/protocol/engine/engine.go b/pkg/protocol/engine/engine.go index f3f3ae001..65622aa2c 100644 --- a/pkg/protocol/engine/engine.go +++ b/pkg/protocol/engine/engine.go @@ -229,7 +229,7 @@ func New( e.Initialized.Trigger() - e.LogDebug("initialized", "settings", e.Storage.Settings().String()) + e.LogTrace("initialized", "settings", e.Storage.Settings().String()) }, ) } diff --git a/pkg/protocol/protocol.go b/pkg/protocol/protocol.go index 7af24aeb1..d836bb772 100644 --- a/pkg/protocol/protocol.go +++ b/pkg/protocol/protocol.go @@ -164,9 +164,13 @@ func (p *Protocol) initSubcomponents(networkEndpoint network.Endpoint) (shutdown // initEviction initializes the eviction of old data when the engine advances and returns a function that shuts it down. func (p *Protocol) initEviction() (shutdown func()) { + evictionWorker := p.Workers.CreatePool("Eviction").Start() + return p.Commitments.Root.OnUpdate(func(_ *Commitment, rootCommitment *Commitment) { if rootSlot := rootCommitment.Slot(); rootSlot > 0 { - p.Evict(rootSlot - 1) + evictionWorker.Submit(func() { + p.Evict(rootSlot - 1) + }) } }) } diff --git a/pkg/protocol/utils.go b/pkg/protocol/utils.go index c97d5977b..fe060eaf6 100644 --- a/pkg/protocol/utils.go +++ b/pkg/protocol/utils.go @@ -9,7 +9,7 @@ import ( func loggedWorkerPoolTask(workerPool *workerpool.WorkerPool, processRequest func() error, logger log.Logger, loggerArgs ...any) { workerPool.Submit(func() { if err := processRequest(); err != nil { - logger.LogDebug("failed to answer request", append(loggerArgs, "err", err)...) + logger.LogTrace("failed to answer request", append(loggerArgs, "err", err)...) } else { logger.LogTrace("answered request", loggerArgs...) } diff --git a/pkg/protocol/warp_sync.go b/pkg/protocol/warp_sync.go index 05a047b28..7eff35cf2 100644 --- a/pkg/protocol/warp_sync.go +++ b/pkg/protocol/warp_sync.go @@ -234,7 +234,10 @@ func (w *WarpSync) ProcessResponse(commitmentID iotago.CommitmentID, blockIDsByS commitment.IsSynced.OnUpdateOnce(func(_ bool, _ bool) { // update the flag in a worker since it can potentially cause a commit w.workerPool.Submit(func() { - if committableCommitment, exists := chain.Commitment(commitmentID.Slot() - targetEngine.LatestAPI().ProtocolParameters().MinCommittableAge()); exists { + // we add +1 here to enable syncing of chains of empty commitments since we can assume that there is + // at least 1 additional slot building on top of the synced commitment as it would have otherwise + // not turned into a commitment in the first place. + if committableCommitment, exists := chain.Commitment(commitmentID.Slot() - targetEngine.LatestAPI().ProtocolParameters().MinCommittableAge() + 1); exists { committableCommitment.IsCommittable.Set(true) } }) diff --git a/pkg/tests/protocol_engine_switching_test.go b/pkg/tests/protocol_engine_switching_test.go index 1b4e7a6a8..ef4d208d6 100644 --- a/pkg/tests/protocol_engine_switching_test.go +++ b/pkg/tests/protocol_engine_switching_test.go @@ -1,13 +1,16 @@ package tests import ( + "bytes" "context" "fmt" + "strconv" "sync" "testing" "time" "github.com/iotaledger/hive.go/core/eventticker" + "github.com/iotaledger/hive.go/ds" "github.com/iotaledger/hive.go/lo" "github.com/iotaledger/hive.go/runtime/module" "github.com/iotaledger/hive.go/runtime/options" @@ -641,3 +644,338 @@ func TestProtocol_EngineSwitching_CommitteeRotation(t *testing.T) { ts.AssertAttestationsForSlot(18, ts.Blocks("P1:15.3-node0", "P1:18.3-node1", "P1:18.3-node2"), ts.Nodes()...) // We're in Epoch 2 (only node1, node2) but we carry attestations of others because of window ts.AssertAttestationsForSlot(19, ts.Blocks("P1:19.3-node1", "P1:19.3-node2"), ts.Nodes()...) // Committee in epoch 2 is only node1, node2 } + +func TestProtocol_EngineSwitching_Tie(t *testing.T) { + var ( + genesisSlot iotago.SlotIndex = 0 + minCommittableAge iotago.SlotIndex = 2 + maxCommittableAge iotago.SlotIndex = 4 + ) + + ts := testsuite.NewTestSuite(t, + testsuite.WithProtocolParametersOptions( + iotago.WithTimeProviderOptions( + genesisSlot, + testsuite.GenesisTimeWithOffsetBySlots(1000, testsuite.DefaultSlotDurationInSeconds), + testsuite.DefaultSlotDurationInSeconds, + 3, + ), + iotago.WithLivenessOptions( + 10, + 10, + minCommittableAge, + maxCommittableAge, + 5, + ), + ), + + testsuite.WithWaitFor(15*time.Second), + ) + defer ts.Shutdown() + + nodes := []*mock.Node{ + ts.AddValidatorNode("node0"), + ts.AddValidatorNode("node1"), + ts.AddValidatorNode("node2"), + } + + validatorsByAccountID := map[iotago.AccountID]*mock.Node{ + nodes[0].Validator.AccountID: nodes[0], + nodes[1].Validator.AccountID: nodes[1], + nodes[2].Validator.AccountID: nodes[2], + } + + ts.AddDefaultWallet(nodes[0]) + + const expectedCommittedSlotAfterPartitionMerge = 18 + + nodeOptions := []options.Option[protocol.Protocol]{ + protocol.WithSybilProtectionProvider( + sybilprotectionv1.NewProvider( + sybilprotectionv1.WithSeatManagerProvider(module.Provide(func(e *engine.Engine) seatmanager.SeatManager { + poa := mock2.NewManualPOAProvider()(e).(*mock2.ManualPOA) + for _, node := range lo.Filter(nodes, (*mock.Node).IsValidator) { + poa.AddAccount(node.Validator.AccountID, node.Name) + } + + onlineValidators := ds.NewSet[string]() + + e.Constructed.OnTrigger(func() { + e.Events.BlockDAG.BlockAttached.Hook(func(block *blocks.Block) { + if node, exists := validatorsByAccountID[block.ModelBlock().ProtocolBlock().Header.IssuerID]; exists && onlineValidators.Add(node.Name) { + e.LogError("node online", "name", node.Name) + poa.SetOnline(onlineValidators.ToSlice()...) + } + }) + }) + + return poa + })), + ), + ), + + protocol.WithEngineOptions( + engine.WithBlockRequesterOptions( + eventticker.RetryInterval[iotago.SlotIndex, iotago.BlockID](1*time.Second), + eventticker.RetryJitter[iotago.SlotIndex, iotago.BlockID](500*time.Millisecond), + ), + ), + + protocol.WithSyncManagerProvider( + trivialsyncmanager.NewProvider( + trivialsyncmanager.WithBootstrappedFunc(func(e *engine.Engine) bool { + return e.Storage.Settings().LatestCommitment().Slot() >= expectedCommittedSlotAfterPartitionMerge && e.Notarization.IsBootstrapped() + }), + ), + ), + + protocol.WithStorageOptions( + storage.WithPruningDelay(20), + ), + } + + nodesOptions := make(map[string][]options.Option[protocol.Protocol]) + for _, node := range ts.Nodes() { + nodesOptions[node.Name] = nodeOptions + } + + ts.Run(false, nodesOptions) + + expectedCommittee := []iotago.AccountID{nodes[0].Validator.AccountID, nodes[1].Validator.AccountID, nodes[2].Validator.AccountID} + + seatIndexes := []account.SeatIndex{ + lo.Return1(lo.Return1(nodes[0].Protocol.Engines.Main.Get().SybilProtection.SeatManager().CommitteeInSlot(1)).GetSeat(nodes[0].Validator.AccountID)), + lo.Return1(lo.Return1(nodes[0].Protocol.Engines.Main.Get().SybilProtection.SeatManager().CommitteeInSlot(1)).GetSeat(nodes[1].Validator.AccountID)), + lo.Return1(lo.Return1(nodes[0].Protocol.Engines.Main.Get().SybilProtection.SeatManager().CommitteeInSlot(1)).GetSeat(nodes[2].Validator.AccountID)), + } + + for _, node := range ts.Nodes() { + node.Protocol.Engines.Main.Get().SybilProtection.SeatManager().(*mock2.ManualPOA).SetOnline("node0", "node1", "node2") + } + + // Verify that nodes have the expected states. + { + genesisCommitment := iotago.NewEmptyCommitment(ts.API) + genesisCommitment.ReferenceManaCost = ts.API.ProtocolParameters().CongestionControlParameters().MinReferenceManaCost + ts.AssertNodeState(ts.Nodes(), + testsuite.WithSnapshotImported(true), + testsuite.WithProtocolParameters(ts.API.ProtocolParameters()), + testsuite.WithLatestCommitment(genesisCommitment), + testsuite.WithLatestFinalizedSlot(0), + testsuite.WithChainID(genesisCommitment.MustID()), + testsuite.WithStorageCommitments([]*iotago.Commitment{genesisCommitment}), + + testsuite.WithSybilProtectionCommittee(0, expectedCommittee), + testsuite.WithSybilProtectionOnlineCommittee(seatIndexes...), + testsuite.WithEvictedSlot(0), + testsuite.WithActiveRootBlocks(ts.Blocks("Genesis")), + testsuite.WithStorageRootBlocks(ts.Blocks("Genesis")), + ) + } + + nodesInPartition := func(partition int) []*mock.Node { + switch { + case partition == 1: + return nodes[0:1] + case partition == 2: + return nodes[1:2] + case partition == 3: + return nodes[2:3] + default: + return nodes + } + } + + nodesOutsidePartition := func(partition int) []*mock.Node { + switch { + case partition == 1: + return []*mock.Node{nodes[1], nodes[2]} + case partition == 2: + return []*mock.Node{nodes[0], nodes[2]} + case partition == 3: + return []*mock.Node{nodes[0], nodes[1]} + default: + return []*mock.Node{} + } + } + + onlineCommittee := func(partition int) []account.SeatIndex { + switch { + case partition == 1: + return []account.SeatIndex{seatIndexes[0]} + case partition == 2: + return []account.SeatIndex{seatIndexes[1]} + case partition == 3: + return []account.SeatIndex{seatIndexes[2]} + default: + return seatIndexes + } + } + + lastCommonSlot := iotago.SlotIndex(13) + + issueBlocks := func(partition int, slots []iotago.SlotIndex) { + parentSlot := slots[0] - 1 + lastIssuedSlot := slots[len(slots)-1] + targetNodes := nodesInPartition(partition) + otherNodes := nodesOutsidePartition(partition) + lastCommittedSlot := lastIssuedSlot - minCommittableAge + + initialParentsPrefix := slotPrefix(partition, parentSlot) + strconv.Itoa(int(parentSlot)) + ".3" + if parentSlot == genesisSlot { + initialParentsPrefix = "Genesis" + } + + ts.IssueBlocksAtSlots(slotPrefix(partition, slots[0]), slots, 4, initialParentsPrefix, targetNodes, true, false) + + cumulativeAttestations := uint64(0) + for slot := genesisSlot + maxCommittableAge; slot <= lastCommittedSlot; slot++ { + var attestationBlocks Blocks + for _, node := range targetNodes { + attestationBlocks.Add(ts, node, partition, slot) + + cumulativeAttestations++ + } + + for _, node := range otherNodes { + if slot <= lastCommonSlot+minCommittableAge { + attestationBlocks.Add(ts, node, partition, min(slot, lastCommonSlot)) // carry forward last known attestations + + cumulativeAttestations++ + } + } + + ts.AssertAttestationsForSlot(slot, attestationBlocks, targetNodes...) + } + + ts.AssertNodeState(targetNodes, + testsuite.WithLatestFinalizedSlot(10), + testsuite.WithLatestCommitmentSlotIndex(lastCommittedSlot), + testsuite.WithEqualStoredCommitmentAtIndex(lastCommittedSlot), + testsuite.WithLatestCommitmentCumulativeWeight(cumulativeAttestations), + testsuite.WithSybilProtectionCommittee(ts.API.TimeProvider().EpochFromSlot(lastCommittedSlot), expectedCommittee), + testsuite.WithSybilProtectionOnlineCommittee(onlineCommittee(partition)...), + testsuite.WithEvictedSlot(lastCommittedSlot), + ) + + var tipBlocks Blocks + for _, node := range targetNodes { + tipBlocks.Add(ts, node, partition, lastIssuedSlot) + } + + ts.AssertStrongTips(tipBlocks, targetNodes...) + } + + issueBlocks(0, []iotago.SlotIndex{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}) + + // Split into partitions P1, P2 and P3. + ts.SplitIntoPartitions(map[string][]*mock.Node{ + "P1": {nodes[0]}, + "P2": {nodes[1]}, + "P3": {nodes[2]}, + }) + + // Set online committee for each partition. + for _, node := range ts.Nodes() { + manualPOA := node.Protocol.Engines.Main.Get().SybilProtection.SeatManager().(*mock2.ManualPOA) + if node.Partition == "P1" { + manualPOA.SetOnline("node0") + manualPOA.SetOffline("node1", "node2") + } else if node.Partition == "P2" { + manualPOA.SetOnline("node1") + manualPOA.SetOffline("node0", "node2") + } else { + manualPOA.SetOnline("node2") + manualPOA.SetOffline("node0", "node1") + } + } + + ts.AssertSybilProtectionOnlineCommittee(seatIndexes[0:1], nodes[0]) + ts.AssertSybilProtectionOnlineCommittee(seatIndexes[1:2], nodes[1]) + ts.AssertSybilProtectionOnlineCommittee(seatIndexes[2:3], nodes[2]) + + issueBlocks(1, []iotago.SlotIndex{14, 15, 16, 17, 18, 19, 20}) + issueBlocks(2, []iotago.SlotIndex{14, 15, 16, 17, 18, 19, 20}) + issueBlocks(3, []iotago.SlotIndex{14, 15, 16, 17, 18, 19, 20}) + + commitment140, _ := nodes[0].Protocol.Chains.Main.Get().Commitment(14) + commitment141, _ := nodes[1].Protocol.Chains.Main.Get().Commitment(14) + commitment142, _ := nodes[2].Protocol.Chains.Main.Get().Commitment(14) + + var mainPartition []*mock.Node + var otherPartitions []*mock.Node + switch commitmentWithLargestID(commitment140, commitment141, commitment142) { + case commitment140: + mainPartition = nodes[0:1] + otherPartitions = []*mock.Node{nodes[1], nodes[2]} + case commitment141: + mainPartition = nodes[1:2] + otherPartitions = []*mock.Node{nodes[0], nodes[2]} + case commitment142: + mainPartition = nodes[2:3] + otherPartitions = []*mock.Node{nodes[0], nodes[1]} + } + + // Merge the partitions + { + fmt.Println("") + fmt.Println("==========================") + fmt.Println("Merging network partitions") + fmt.Println("--------------------------") + fmt.Println("Winner: ", mainPartition[0].Protocol.LogName()) + fmt.Println("Losers: ", otherPartitions[0].Protocol.LogName(), otherPartitions[1].Protocol.LogName()) + fmt.Println("==========================") + fmt.Println("") + + ts.MergePartitionsToMain() + } + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + ctxP1, ctxP1Cancel := context.WithCancel(ctx) + ctxP2, ctxP2Cancel := context.WithCancel(ctx) + ctxP3, ctxP3Cancel := context.WithCancel(ctx) + + wg := &sync.WaitGroup{} + + // Issue blocks on both partitions after merging the networks. + nodes[0].Validator.IssueActivity(ctxP1, wg, 21, nodes[0]) + nodes[1].Validator.IssueActivity(ctxP2, wg, 21, nodes[1]) + nodes[2].Validator.IssueActivity(ctxP3, wg, 21, nodes[2]) + + ts.AssertMainEngineSwitchedCount(0, mainPartition...) + ts.AssertMainEngineSwitchedCount(1, otherPartitions...) + + ctxP1Cancel() + ctxP2Cancel() + ctxP3Cancel() + wg.Wait() + + ts.AssertEqualStoredCommitmentAtIndex(expectedCommittedSlotAfterPartitionMerge, ts.Nodes()...) +} + +type Blocks []*blocks.Block + +func (a *Blocks) Add(ts *testsuite.TestSuite, node *mock.Node, partition int, slot iotago.SlotIndex) { + *a = append(*a, ts.Block(fmt.Sprintf("%s%d.3-%s", slotPrefix(partition, slot), slot, node.Name))) +} + +func slotPrefix(partition int, slot iotago.SlotIndex) string { + if slot <= 13 { + return "P0:" + } + + return "P" + strconv.Itoa(partition) + ":" +} + +func commitmentWithLargestID(commitments ...*protocol.Commitment) *protocol.Commitment { + var largestCommitment *protocol.Commitment + for _, commitment := range commitments { + if largestCommitment == nil || bytes.Compare(lo.PanicOnErr(commitment.ID().Bytes()), lo.PanicOnErr(largestCommitment.ID().Bytes())) > 0 { + largestCommitment = commitment + } + } + + return largestCommitment +} diff --git a/pkg/testsuite/mock/node.go b/pkg/testsuite/mock/node.go index 5e4dabdc3..5c5672c6d 100644 --- a/pkg/testsuite/mock/node.go +++ b/pkg/testsuite/mock/node.go @@ -227,17 +227,11 @@ func (n *Node) attachEngineLogsWithName(failOnBlockFiltered bool, instance *engi events := instance.Events events.BlockDAG.BlockAttached.Hook(func(block *blocks.Block) { - instance.LogTrace("BlockDAG.BlockAttached", "block", block.ID()) - n.mutex.Lock() defer n.mutex.Unlock() n.attachedBlocks = append(n.attachedBlocks, block) }) - events.BlockDAG.BlockSolid.Hook(func(block *blocks.Block) { - instance.LogTrace("BlockDAG.BlockSolid", "block", block.ID()) - }) - events.BlockDAG.BlockInvalid.Hook(func(block *blocks.Block, err error) { instance.LogTrace("BlockDAG.BlockInvalid", "block", block.ID(), "err", err) })