diff --git a/pkg/core/buffer/unsolid_commitment_buffer.go b/pkg/core/buffer/unsolid_commitment_buffer.go index 27df0dd51..a6417a4d7 100644 --- a/pkg/core/buffer/unsolid_commitment_buffer.go +++ b/pkg/core/buffer/unsolid_commitment_buffer.go @@ -51,7 +51,7 @@ func (u *UnsolidCommitmentBuffer[V]) Add(commitmentID iotago.CommitmentID, value u.mutex.RLock() defer u.mutex.RUnlock() - if commitmentID.Slot() <= u.lastEvictedSlot { + if u.lastEvictedSlot != 0 && commitmentID.Slot() <= u.lastEvictedSlot { return false } diff --git a/pkg/protocol/chain.go b/pkg/protocol/chain.go index 8c26912dd..86b1d1d30 100644 --- a/pkg/protocol/chain.go +++ b/pkg/protocol/chain.go @@ -29,6 +29,9 @@ type Chain struct { // LatestAttestedCommitment contains the latest commitment of this chain for which attestations were received. LatestAttestedCommitment reactive.Variable[*Commitment] + // LatestFullyBookedCommitment contains the latest commitment of this chain for which all blocks were booked. + LatestFullyBookedCommitment reactive.Variable[*Commitment] + // LatestProducedCommitment contains the latest commitment of this chain that we produced ourselves by booking the // corresponding blocks in the Engine. LatestProducedCommitment reactive.Variable[*Commitment] @@ -45,12 +48,8 @@ type Chain struct { // latest verified commitment. VerifiedWeight reactive.Variable[uint64] - // WarpSyncMode contains a flag that indicates whether this chain is in warp sync mode. - WarpSyncMode reactive.Variable[bool] - - // WarpSyncThreshold contains the slot at which the chain will exit warp sync mode which is derived from the latest - // network slot minus the max committable age. - WarpSyncThreshold reactive.Variable[iotago.SlotIndex] + // WarpSyncModeEnabled contains a flag that indicates whether this chain is in warp sync mode. + WarpSyncModeEnabled reactive.Variable[bool] // OutOfSyncThreshold contains the slot at which the chain will consider itself to be out of sync and switch to warp // sync mode. It is derived from the latest network slot minus two times the max committable age. @@ -83,22 +82,22 @@ type Chain struct { // newChain creates a new chain instance. func newChain(chains *Chains) *Chain { c := &Chain{ - ForkingPoint: reactive.NewVariable[*Commitment](), - ParentChain: reactive.NewVariable[*Chain](), - ChildChains: reactive.NewSet[*Chain](), - LatestCommitment: reactive.NewVariable[*Commitment](), - LatestAttestedCommitment: reactive.NewVariable[*Commitment](), - LatestProducedCommitment: reactive.NewVariable[*Commitment](), - ClaimedWeight: reactive.NewVariable[uint64](), - AttestedWeight: reactive.NewVariable[uint64](), - VerifiedWeight: reactive.NewVariable[uint64](), - WarpSyncMode: reactive.NewVariable[bool](), - WarpSyncThreshold: reactive.NewVariable[iotago.SlotIndex](), - OutOfSyncThreshold: reactive.NewVariable[iotago.SlotIndex](), - RequestAttestations: reactive.NewVariable[bool](), - StartEngine: reactive.NewVariable[bool](), - Engine: reactive.NewVariable[*engine.Engine](), - IsEvicted: reactive.NewEvent(), + ForkingPoint: reactive.NewVariable[*Commitment](), + ParentChain: reactive.NewVariable[*Chain](), + ChildChains: reactive.NewSet[*Chain](), + LatestCommitment: reactive.NewVariable[*Commitment](), + LatestAttestedCommitment: reactive.NewVariable[*Commitment](), + LatestFullyBookedCommitment: reactive.NewVariable[*Commitment](), + LatestProducedCommitment: reactive.NewVariable[*Commitment](), + ClaimedWeight: reactive.NewVariable[uint64](), + AttestedWeight: reactive.NewVariable[uint64](), + VerifiedWeight: reactive.NewVariable[uint64](), + WarpSyncModeEnabled: reactive.NewVariable[bool]().Init(true), + OutOfSyncThreshold: reactive.NewVariable[iotago.SlotIndex](), + RequestAttestations: reactive.NewVariable[bool](), + StartEngine: reactive.NewVariable[bool](), + Engine: reactive.NewVariable[*engine.Engine](), + IsEvicted: reactive.NewEvent(), chains: chains, commitments: shrinkingmap.New[iotago.SlotIndex, *Commitment](), @@ -157,6 +156,8 @@ func (c *Chain) DispatchBlock(block *model.Block, src peer.ID) (dispatched bool) func (c *Chain) Commitment(slot iotago.SlotIndex) (commitment *Commitment, exists bool) { for currentChain := c; currentChain != nil; { switch forkingPoint := currentChain.ForkingPoint.Get(); { + case forkingPoint == nil: + return nil, false case forkingPoint.Slot() == slot: return forkingPoint, true case slot > forkingPoint.Slot(): @@ -186,14 +187,15 @@ func (c *Chain) initLogger() (shutdown func()) { c.Logger, shutdown = c.chains.NewEntityLogger("") return lo.Batch( - c.WarpSyncMode.LogUpdates(c, log.LevelTrace, "WarpSyncMode"), - c.WarpSyncThreshold.LogUpdates(c, log.LevelTrace, "WarpSyncThreshold"), + c.WarpSyncModeEnabled.LogUpdates(c, log.LevelTrace, "WarpSyncModeEnabled"), c.OutOfSyncThreshold.LogUpdates(c, log.LevelTrace, "OutOfSyncThreshold"), c.ForkingPoint.LogUpdates(c, log.LevelTrace, "ForkingPoint", (*Commitment).LogName), c.ClaimedWeight.LogUpdates(c, log.LevelTrace, "ClaimedWeight"), c.AttestedWeight.LogUpdates(c, log.LevelTrace, "AttestedWeight"), c.VerifiedWeight.LogUpdates(c, log.LevelTrace, "VerifiedWeight"), c.LatestCommitment.LogUpdates(c, log.LevelTrace, "LatestCommitment", (*Commitment).LogName), + c.LatestAttestedCommitment.LogUpdates(c, log.LevelTrace, "LatestAttestedCommitment", (*Commitment).LogName), + c.LatestFullyBookedCommitment.LogUpdates(c, log.LevelTrace, "LatestFullyBookedCommitment", (*Commitment).LogName), c.LatestProducedCommitment.LogUpdates(c, log.LevelDebug, "LatestProducedCommitment", (*Commitment).LogName), c.RequestAttestations.LogUpdates(c, log.LevelTrace, "RequestAttestations"), c.StartEngine.LogUpdates(c, log.LevelDebug, "StartEngine"), @@ -207,10 +209,21 @@ func (c *Chain) initLogger() (shutdown func()) { // initDerivedProperties initializes the behavior of this chain by setting up the relations between its properties. func (c *Chain) initDerivedProperties() (shutdown func()) { return lo.Batch( - c.deriveClaimedWeight(), - c.deriveVerifiedWeight(), - c.deriveLatestAttestedWeight(), - c.deriveWarpSyncMode(), + c.ClaimedWeight.DeriveValueFrom(reactive.NewDerivedVariable(func(_ uint64, latestCommitment *Commitment) uint64 { + return latestCommitment.cumulativeWeight() + }, c.LatestCommitment)), + + c.VerifiedWeight.DeriveValueFrom(reactive.NewDerivedVariable(func(_ uint64, latestProducedCommitment *Commitment) uint64 { + return latestProducedCommitment.cumulativeWeight() + }, c.LatestProducedCommitment)), + + c.WarpSyncModeEnabled.DeriveValueFrom(reactive.NewDerivedVariable3(func(enabled bool, latestFullyBookedCommitment *Commitment, latestSeenSlot iotago.SlotIndex, outOfSyncThreshold iotago.SlotIndex) bool { + return warpSyncModeEnabled(enabled, latestFullyBookedCommitment, latestSeenSlot, outOfSyncThreshold) + }, c.LatestFullyBookedCommitment, c.chains.LatestSeenSlot, c.OutOfSyncThreshold, c.WarpSyncModeEnabled.Get())), + + c.LatestAttestedCommitment.WithNonEmptyValue(func(latestAttestedCommitment *Commitment) (shutdown func()) { + return c.AttestedWeight.InheritFrom(latestAttestedCommitment.CumulativeAttestedWeight) + }), c.ForkingPoint.WithValue(func(forkingPoint *Commitment) (shutdown func()) { return c.deriveParentChain(forkingPoint) @@ -221,66 +234,13 @@ func (c *Chain) initDerivedProperties() (shutdown func()) { }), c.Engine.WithNonEmptyValue(func(engineInstance *engine.Engine) (shutdown func()) { - return lo.Batch( - c.deriveWarpSyncThreshold(c.chains.LatestSeenSlot, engineInstance), - c.deriveOutOfSyncThreshold(c.chains.LatestSeenSlot, engineInstance), - ) + return c.OutOfSyncThreshold.DeriveValueFrom(reactive.NewDerivedVariable(func(_ iotago.SlotIndex, latestSeenSlot iotago.SlotIndex) iotago.SlotIndex { + return outOfSyncThreshold(engineInstance, latestSeenSlot) + }, c.chains.LatestSeenSlot)) }), ) } -// deriveWarpSyncMode defines how a chain determines whether it is in warp sync mode or not. -func (c *Chain) deriveWarpSyncMode() func() { - return c.WarpSyncMode.DeriveValueFrom(reactive.NewDerivedVariable3(func(warpSyncMode bool, latestProducedCommitment *Commitment, warpSyncThreshold iotago.SlotIndex, outOfSyncThreshold iotago.SlotIndex) bool { - // latest produced commitment is nil if we have not produced any commitment yet (intermediary state during - // startup) - if latestProducedCommitment == nil { - return warpSyncMode - } - - // if warp sync mode is enabled, keep it enabled until we are no longer below the warp sync threshold - if warpSyncMode { - return latestProducedCommitment.ID().Slot() < warpSyncThreshold - } - - // if warp sync mode is disabled, enable it only if we fall below the out of sync threshold - return latestProducedCommitment.ID().Slot() < outOfSyncThreshold - }, c.LatestProducedCommitment, c.WarpSyncThreshold, c.OutOfSyncThreshold, c.WarpSyncMode.Get())) -} - -// deriveClaimedWeight defines how a chain determines its claimed weight (by setting the cumulative weight of the -// latest commitment). -func (c *Chain) deriveClaimedWeight() (shutdown func()) { - return c.ClaimedWeight.DeriveValueFrom(reactive.NewDerivedVariable(func(_ uint64, latestCommitment *Commitment) uint64 { - if latestCommitment == nil { - return 0 - } - - return latestCommitment.CumulativeWeight() - }, c.LatestCommitment)) -} - -// deriveLatestAttestedWeight defines how a chain determines its attested weight (by inheriting the cumulative attested -// weight of the latest attested commitment). It uses inheritance instead of simply setting the value as the cumulative -// attested weight can change over time depending on the attestations that are received. -func (c *Chain) deriveLatestAttestedWeight() func() { - return c.LatestAttestedCommitment.WithNonEmptyValue(func(latestAttestedCommitment *Commitment) (shutdown func()) { - return c.AttestedWeight.InheritFrom(latestAttestedCommitment.CumulativeAttestedWeight) - }) -} - -// deriveVerifiedWeight defines how a chain determines its verified weight (by setting the cumulative weight of the -// latest produced commitment). -func (c *Chain) deriveVerifiedWeight() func() { - return c.VerifiedWeight.DeriveValueFrom(reactive.NewDerivedVariable(func(_ uint64, latestProducedCommitment *Commitment) uint64 { - if latestProducedCommitment == nil { - return 0 - } - - return latestProducedCommitment.CumulativeWeight() - }, c.LatestProducedCommitment)) -} - // deriveChildChains defines how a chain determines its ChildChains (by adding each child to the set). func (c *Chain) deriveChildChains(child *Chain) func() { c.ChildChains.Add(child) @@ -310,31 +270,6 @@ func (c *Chain) deriveParentChain(forkingPoint *Commitment) (shutdown func()) { return nil } -// deriveOutOfSyncThreshold defines how a chain determines its "out of sync" threshold (the latest seen slot minus 2 -// times the max committable age or 0 if this would cause an overflow to the negative numbers). -func (c *Chain) deriveOutOfSyncThreshold(latestSeenSlot reactive.ReadableVariable[iotago.SlotIndex], engineInstance *engine.Engine) func() { - return c.OutOfSyncThreshold.DeriveValueFrom(reactive.NewDerivedVariable(func(_ iotago.SlotIndex, latestSeenSlot iotago.SlotIndex) iotago.SlotIndex { - if outOfSyncOffset := 2 * engineInstance.LatestAPI().ProtocolParameters().MaxCommittableAge(); outOfSyncOffset < latestSeenSlot { - return latestSeenSlot - outOfSyncOffset - } - - return 0 - }, latestSeenSlot)) -} - -// deriveWarpSyncThreshold defines how a chain determines its warp sync threshold (the latest seen slot minus the max -// committable age or 0 if this would cause an overflow to the negative numbers). -func (c *Chain) deriveWarpSyncThreshold(latestSeenSlot reactive.ReadableVariable[iotago.SlotIndex], engineInstance *engine.Engine) func() { - return c.WarpSyncThreshold.DeriveValueFrom(reactive.NewDerivedVariable(func(_ iotago.SlotIndex, latestSeenSlot iotago.SlotIndex) iotago.SlotIndex { - warpSyncOffset := engineInstance.LatestAPI().ProtocolParameters().MaxCommittableAge() - if warpSyncOffset < latestSeenSlot { - return latestSeenSlot - warpSyncOffset - } - - return 0 - }, latestSeenSlot)) -} - // addCommitment adds the given commitment to this chain. func (c *Chain) addCommitment(newCommitment *Commitment) (shutdown func()) { c.commitments.Set(newCommitment.Slot(), newCommitment) @@ -343,6 +278,7 @@ func (c *Chain) addCommitment(newCommitment *Commitment) (shutdown func()) { return lo.Batch( newCommitment.IsAttested.OnTrigger(func() { c.LatestAttestedCommitment.Set(newCommitment) }), + newCommitment.IsFullyBooked.OnTrigger(func() { c.LatestFullyBookedCommitment.Set(newCommitment) }), newCommitment.IsCommitted.OnTrigger(func() { c.LatestProducedCommitment.Set(newCommitment) }), ) } @@ -363,7 +299,7 @@ func (c *Chain) dispatchBlockToSpawnedEngine(block *model.Block, src peer.ID) (d } // perform additional checks if we are in warp sync mode (only let blocks pass that we requested) - if c.WarpSyncMode.Get() { + if c.WarpSyncModeEnabled.Get() { // abort if the target commitment does not exist targetCommitment, targetCommitmentExists := c.Commitment(targetSlot) if !targetCommitmentExists { @@ -400,3 +336,29 @@ func (c *Chain) verifiedWeight() reactive.Variable[uint64] { func (c *Chain) attestedWeight() reactive.Variable[uint64] { return c.AttestedWeight } + +// outOfSyncThreshold returns the slot index at which the node is considered out of sync. +func outOfSyncThreshold(engineInstance *engine.Engine, latestSeenSlot iotago.SlotIndex) iotago.SlotIndex { + if outOfSyncOffset := 2 * engineInstance.LatestAPI().ProtocolParameters().MaxCommittableAge(); outOfSyncOffset < latestSeenSlot { + return latestSeenSlot - outOfSyncOffset + } + + return 0 +} + +// warpSyncModeEnabled determines whether warp sync mode should be enabled or not. +func warpSyncModeEnabled(enabled bool, latestFullyBookedCommitment *Commitment, latestSeenSlot iotago.SlotIndex, outOfSyncThreshold iotago.SlotIndex) bool { + // latest produced commitment is nil if we have not produced any commitment yet (intermediary state during + // startup) + if latestFullyBookedCommitment == nil { + return enabled + } + + // if warp sync mode is enabled, keep it enabled until we are no longer below the warp sync threshold + if enabled { + return latestFullyBookedCommitment.ID().Slot() < latestSeenSlot + } + + // if warp sync mode is disabled, enable it only if we fall below the out of sync threshold + return latestFullyBookedCommitment.ID().Slot() < outOfSyncThreshold +} diff --git a/pkg/protocol/commitment.go b/pkg/protocol/commitment.go index ca06a32ab..1b956deba 100644 --- a/pkg/protocol/commitment.go +++ b/pkg/protocol/commitment.go @@ -158,11 +158,11 @@ func (c *Commitment) initDerivedProperties() (shutdown func()) { return lo.Batch( // mark commitments that are marked as root as verified c.IsCommitted.InheritFrom(c.IsRoot), + c.ReplayDroppedBlocks.InheritFrom(c.IsRoot), - // mark commitments that are marked as verified as attested, fully booked and committable + // mark commitments that are marked as verified as attested and fully booked c.IsAttested.InheritFrom(c.IsCommitted), c.IsFullyBooked.InheritFrom(c.IsCommitted), - c.IsCommittable.InheritFrom(c.IsCommitted), c.Parent.WithNonEmptyValue(func(parent *Commitment) func() { // the weight can be fixed as a one time operation (as it only relies on static information from the parent @@ -274,17 +274,17 @@ func (c *Commitment) deriveRequestAttestations(chain *Chain, parent *Commitment) // deriveWarpSyncBlocks derives the WarpSyncBlocks flag of this Commitment which is true if our Chain is requesting // warp sync, and we are the directly above the latest verified Commitment. func (c *Commitment) deriveWarpSyncBlocks(chain *Chain, parent *Commitment) func() { - return c.WarpSyncBlocks.DeriveValueFrom(reactive.NewDerivedVariable4(func(_ bool, engineInstance *engine.Engine, warpSync bool, parentIsFullyBooked bool, isFullyBooked bool) bool { - return engineInstance != nil && warpSync && parentIsFullyBooked && !isFullyBooked - }, chain.Engine, chain.WarpSyncMode, parent.IsFullyBooked, c.IsFullyBooked)) + return c.WarpSyncBlocks.DeriveValueFrom(reactive.NewDerivedVariable4(func(_ bool, engineInstance *engine.Engine, warpSyncModeEnabled bool, parentIsFullyBooked bool, isFullyBooked bool) bool { + return engineInstance != nil && warpSyncModeEnabled && parentIsFullyBooked && !isFullyBooked + }, chain.Engine, chain.WarpSyncModeEnabled, parent.IsFullyBooked, c.IsFullyBooked)) } // deriveReplayDroppedBlocks derives the ReplayDroppedBlocks flag of this Commitment which is true if our Chain has an // engine, is no longer requesting warp sync, and we are above the latest verified Commitment. func (c *Commitment) deriveReplayDroppedBlocks(chain *Chain) func() { - return c.ReplayDroppedBlocks.DeriveValueFrom(reactive.NewDerivedVariable3(func(_ bool, engineInstance *engine.Engine, warpSyncing bool, isAboveLatestVerifiedCommitment bool) bool { - return engineInstance != nil && !warpSyncing && isAboveLatestVerifiedCommitment - }, chain.Engine, chain.WarpSyncMode, c.IsAboveLatestVerifiedCommitment)) + return c.ReplayDroppedBlocks.DeriveValueFrom(reactive.NewDerivedVariable3(func(_ bool, engineInstance *engine.Engine, warpSyncModeEnabled bool, isAboveLatestVerifiedCommitment bool) bool { + return engineInstance != nil && !warpSyncModeEnabled && isAboveLatestVerifiedCommitment + }, chain.Engine, chain.WarpSyncModeEnabled, c.IsAboveLatestVerifiedCommitment)) } // forceChain forces the Chain of this Commitment to the given Chain by promoting it to the main child of its parent if @@ -296,3 +296,12 @@ func (c *Commitment) forceChain(targetChain *Chain) { } } } + +// cumulativeWeight returns the cumulative weight of this Commitment while gracefully handling nil receivers. +func (c *Commitment) cumulativeWeight() uint64 { + if c == nil { + return 0 + } + + return c.CumulativeWeight() +} diff --git a/pkg/protocol/commitment_verifier.go b/pkg/protocol/commitment_verifier.go index ab600fbc9..3ee2b4cc8 100644 --- a/pkg/protocol/commitment_verifier.go +++ b/pkg/protocol/commitment_verifier.go @@ -5,6 +5,7 @@ import ( "github.com/iotaledger/hive.go/ds" "github.com/iotaledger/hive.go/ierrors" "github.com/iotaledger/hive.go/kvstore/mapdb" + "github.com/iotaledger/hive.go/runtime/syncutils" "github.com/iotaledger/iota-core/pkg/model" "github.com/iotaledger/iota-core/pkg/protocol/engine" "github.com/iotaledger/iota-core/pkg/protocol/engine/accounts" @@ -22,6 +23,9 @@ type CommitmentVerifier struct { // validatorAccountsData is the accounts data of the validators for the current epoch as known at lastCommonSlotBeforeFork. // Initially, it is set to the accounts data of the validators for the epoch of the last common commitment before the fork. validatorAccountsData map[iotago.AccountID]*accounts.AccountData + + // mutex is used to synchronize access to validatorAccountsData and epoch. + mutex syncutils.RWMutex } func newCommitmentVerifier(mainEngine *engine.Engine, lastCommonCommitmentBeforeFork *model.Commitment) (*CommitmentVerifier, error) { @@ -76,6 +80,7 @@ func (c *CommitmentVerifier) verifyCommitment(commitment *Commitment, attestatio // This is necessary because the committee might have rotated at the epoch boundary and different validators might be part of it. // In case anything goes wrong we keep using previously known accounts data (initially set to the accounts data // of the validators for the epoch of the last common commitment before the fork). + c.mutex.Lock() apiForSlot := c.engine.APIForSlot(commitment.Slot()) commitmentEpoch := apiForSlot.TimeProvider().EpochFromSlot(commitment.Slot()) if commitmentEpoch > c.epoch { @@ -92,6 +97,7 @@ func (c *CommitmentVerifier) verifyCommitment(commitment *Commitment, attestatio } } } + c.mutex.Unlock() // 3. Verify attestations. blockIDs, seatCount, err := c.verifyAttestations(attestations) @@ -107,6 +113,9 @@ func (c *CommitmentVerifier) verifyCommitment(commitment *Commitment, attestatio } func (c *CommitmentVerifier) verifyAttestations(attestations []*iotago.Attestation) (iotago.BlockIDs, uint64, error) { + c.mutex.RLock() + defer c.mutex.RUnlock() + visitedIdentities := ds.NewSet[iotago.AccountID]() var blockIDs iotago.BlockIDs var seatCount uint64 diff --git a/pkg/protocol/protocol_blocks.go b/pkg/protocol/protocol_blocks.go index b18983f3b..3afd74807 100644 --- a/pkg/protocol/protocol_blocks.go +++ b/pkg/protocol/protocol_blocks.go @@ -53,21 +53,12 @@ func newBlocksProtocol(protocol *Protocol) *BlocksProtocol { }) }) - protocol.Chains.WithElements(func(chain *Chain) func() { - return chain.Engine.WithNonEmptyValue(func(engineInstance *engine.Engine) (shutdown func()) { - return engineInstance.Events.BlockRequester.Tick.Hook(b.SendRequest).Unhook - }) - }) - - protocol.Chains.Main.Get().Engine.OnUpdateWithContext(func(_ *engine.Engine, engine *engine.Engine, unsubscribeOnEngineChange func(subscriptionFactory func() (unsubscribe func()))) { - if engine != nil { - unsubscribeOnEngineChange(func() (unsubscribe func()) { - return lo.Batch( - engine.Events.Scheduler.BlockScheduled.Hook(func(block *blocks.Block) { b.SendResponse(block.ModelBlock()) }).Unhook, - engine.Events.Scheduler.BlockSkipped.Hook(func(block *blocks.Block) { b.SendResponse(block.ModelBlock()) }).Unhook, - ) - }) - } + protocol.Chains.WithInitializedEngines(func(chain *Chain, engine *engine.Engine) (shutdown func()) { + return lo.Batch( + engine.Events.BlockRequester.Tick.Hook(b.SendRequest).Unhook, + engine.Events.Scheduler.BlockScheduled.Hook(func(block *blocks.Block) { b.SendResponse(block.ModelBlock()) }).Unhook, + engine.Events.Scheduler.BlockSkipped.Hook(func(block *blocks.Block) { b.SendResponse(block.ModelBlock()) }).Unhook, + ) }) }) diff --git a/pkg/protocol/protocol_warp_sync.go b/pkg/protocol/protocol_warp_sync.go index 844881d89..374aaa3c6 100644 --- a/pkg/protocol/protocol_warp_sync.go +++ b/pkg/protocol/protocol_warp_sync.go @@ -47,8 +47,8 @@ func newWarpSyncProtocol(protocol *Protocol) *WarpSyncProtocol { protocol.Constructed.OnTrigger(func() { protocol.Chains.WithInitializedEngines(func(chain *Chain, engine *engine.Engine) (shutdown func()) { - return chain.WarpSyncMode.OnUpdate(func(_ bool, warpSyncMode bool) { - if warpSyncMode { + return chain.WarpSyncModeEnabled.OnUpdate(func(_ bool, warpSyncModeEnabled bool) { + if warpSyncModeEnabled { engine.Workers.WaitChildren() engine.Reset() } @@ -110,7 +110,7 @@ func (w *WarpSyncProtocol) ProcessResponse(commitmentID iotago.CommitmentID, blo return } - if !chain.WarpSyncMode.Get() { + if !chain.WarpSyncModeEnabled.Get() { w.LogTrace("response for chain without warp-sync", "chain", chain.LogName(), "fromPeer", from) return @@ -161,7 +161,7 @@ func (w *WarpSyncProtocol) ProcessResponse(commitmentID iotago.CommitmentID, blo targetEngine.Workers.WaitChildren() - if !chain.WarpSyncMode.Get() { + if !chain.WarpSyncModeEnabled.Get() { w.LogTrace("response for chain without warp-sync", "chain", chain.LogName(), "fromPeer", from) return blocksToWarpSync @@ -177,7 +177,7 @@ func (w *WarpSyncProtocol) ProcessResponse(commitmentID iotago.CommitmentID, blo // 2. Mark all blocks as accepted // 3. Force commitment of the slot forceCommitmentFunc := func() { - if !chain.WarpSyncMode.Get() { + if !chain.WarpSyncModeEnabled.Get() { return } @@ -247,20 +247,25 @@ func (w *WarpSyncProtocol) ProcessResponse(commitmentID iotago.CommitmentID, blo }) } + // Once all blocks are fully booked we can mark the commitment that is minCommittableAge older as this + // commitment to be committable. commitment.IsFullyBooked.OnUpdateOnce(func(_ bool, _ bool) { - // Let's assume that MCA is 5: when we want to book 15, we expect to have the commitment of 10 to load - // accounts from it, hence why we make committable the slot at - MCA + 1 with respect of the current slot. - minimumCommittableAge := w.protocol.APIForSlot(commitmentID.Slot()).ProtocolParameters().MinCommittableAge() - if committableCommitment, exists := chain.Commitment(commitmentID.Slot() - minimumCommittableAge); exists { - committableCommitment.IsCommittable.Set(true) + if committableCommitment, exists := chain.Commitment(commitmentID.Slot() - targetEngine.LatestAPI().ProtocolParameters().MinCommittableAge()); exists { + w.workerPool.Submit(func() { + committableCommitment.IsCommittable.Set(true) + }) } }) - commitment.IsCommittable.OnUpdateOnce(func(_ bool, _ bool) { - w.workerPool.Submit(forceCommitmentFunc) + // force commit one by one and wait for the parent to be committed before we can commit the next one + commitment.Parent.WithNonEmptyValue(func(parent *Commitment) (teardown func()) { + return parent.IsCommitted.WithNonEmptyValue(func(_ bool) (teardown func()) { + return commitment.IsCommittable.OnTrigger(forceCommitmentFunc) + }) }) if totalBlocks == 0 { + commitment.IsCommittable.Set(true) commitment.IsFullyBooked.Set(true) return blocksToWarpSync diff --git a/pkg/tests/loss_of_acceptance_test.go b/pkg/tests/loss_of_acceptance_test.go index f47ea4e40..93ec48b57 100644 --- a/pkg/tests/loss_of_acceptance_test.go +++ b/pkg/tests/loss_of_acceptance_test.go @@ -173,8 +173,6 @@ func TestLossOfAcceptanceFromSnapshot(t *testing.T) { { ts.IssueBlocksAtSlots("", []iotago.SlotIndex{21, 22}, 2, "block0", ts.Nodes("node0-restarted"), true, false) - time.Sleep(10 * time.Second) - ts.AssertEqualStoredCommitmentAtIndex(20, ts.Nodes()...) ts.AssertLatestCommitmentSlotIndex(20, ts.Nodes()...) } diff --git a/pkg/testsuite/mock/node.go b/pkg/testsuite/mock/node.go index ab8d94abd..32394616a 100644 --- a/pkg/testsuite/mock/node.go +++ b/pkg/testsuite/mock/node.go @@ -225,11 +225,11 @@ func (n *Node) hookLogging(failOnBlockFiltered bool) { }) } -func (n *Node) attachEngineLogsWithName(failOnBlockFiltered bool, instance *engine.Engine, engineName string) { +func (n *Node) attachEngineLogsWithName(failOnBlockFiltered bool, instance *engine.Engine) { events := instance.Events events.BlockDAG.BlockAttached.Hook(func(block *blocks.Block) { - fmt.Printf("%s > [%s] BlockDAG.BlockAttached: %s\n", n.Name, engineName, block.ID()) + instance.LogTrace("BlockDAG.BlockAttached", "block", block.ID()) n.mutex.Lock() defer n.mutex.Unlock() @@ -237,78 +237,80 @@ func (n *Node) attachEngineLogsWithName(failOnBlockFiltered bool, instance *engi }) events.BlockDAG.BlockSolid.Hook(func(block *blocks.Block) { - fmt.Printf("%s > [%s] BlockDAG.BlockSolid: %s\n", n.Name, engineName, block.ID()) + instance.LogTrace("BlockDAG.BlockSolid", "block", block.ID()) }) events.BlockDAG.BlockInvalid.Hook(func(block *blocks.Block, err error) { - fmt.Printf("%s > [%s] BlockDAG.BlockInvalid: %s - %s\n", n.Name, engineName, block.ID(), err) + instance.LogTrace("BlockDAG.BlockInvalid", "block", block.ID(), "err", err) }) events.BlockDAG.BlockMissing.Hook(func(block *blocks.Block) { - fmt.Printf("%s > [%s] BlockDAG.BlockMissing: %s\n", n.Name, engineName, block.ID()) + instance.LogTrace("BlockDAG.BlockMissing", "block", block.ID()) }) events.BlockDAG.MissingBlockAttached.Hook(func(block *blocks.Block) { - fmt.Printf("%s > [%s] BlockDAG.MissingBlockAttached: %s\n", n.Name, engineName, block.ID()) + instance.LogTrace("BlockDAG.MissingBlockAttached", "block", block.ID()) }) events.SeatManager.BlockProcessed.Hook(func(block *blocks.Block) { - fmt.Printf("%s > [%s] SybilProtection.BlockProcessed: %s\n", n.Name, engineName, block.ID()) + instance.LogTrace("SeatManager.BlockProcessed", "block", block.ID()) }) events.Booker.BlockBooked.Hook(func(block *blocks.Block) { - fmt.Printf("%s > [%s] Booker.BlockBooked: %s\n", n.Name, engineName, block.ID()) + instance.LogTrace("Booker.BlockBooked", "block", block.ID()) }) events.Booker.BlockInvalid.Hook(func(block *blocks.Block, err error) { - fmt.Printf("%s > [%s] Booker.BlockInvalid: %s - %s\n", n.Name, engineName, block.ID(), err.Error()) + instance.LogTrace("Booker.BlockInvalid", "block", block.ID(), "err", err) }) events.Booker.TransactionInvalid.Hook(func(metadata mempool.TransactionMetadata, err error) { - fmt.Printf("%s > [%s] Booker.TransactionInvalid: %s - %s\n", n.Name, engineName, metadata.ID(), err.Error()) + instance.LogTrace("Booker.TransactionInvalid", "tx", metadata.ID(), "err", err) }) events.Scheduler.BlockScheduled.Hook(func(block *blocks.Block) { - fmt.Printf("%s > [%s] Scheduler.BlockScheduled: %s\n", n.Name, engineName, block.ID()) + instance.LogTrace("Scheduler.BlockScheduled", "block", block.ID()) }) events.Scheduler.BlockEnqueued.Hook(func(block *blocks.Block) { - fmt.Printf("%s > [%s] Scheduler.BlockEnqueued: %s\n", n.Name, engineName, block.ID()) + instance.LogTrace("Scheduler.BlockEnqueued", "block", block.ID()) }) events.Scheduler.BlockSkipped.Hook(func(block *blocks.Block) { - fmt.Printf("%s > [%s] Scheduler.BlockSkipped: %s\n", n.Name, engineName, block.ID()) + instance.LogTrace("Scheduler.BlockSkipped", "block", block.ID()) }) events.Scheduler.BlockDropped.Hook(func(block *blocks.Block, err error) { - fmt.Printf("%s > [%s] Scheduler.BlockDropped: %s - %s\n", n.Name, engineName, block.ID(), err.Error()) + instance.LogTrace("Scheduler.BlockDropped", "block", block.ID(), "err", err) }) events.Clock.AcceptedTimeUpdated.Hook(func(newTime time.Time) { - fmt.Printf("%s > [%s] Clock.AcceptedTimeUpdated: %s [Slot %d]\n", n.Name, engineName, newTime, instance.LatestAPI().TimeProvider().SlotFromTime(newTime)) + instance.LogTrace("Clock.AcceptedTimeUpdated", "time", newTime, "slot", instance.LatestAPI().TimeProvider().SlotFromTime(newTime)) }) events.Clock.ConfirmedTimeUpdated.Hook(func(newTime time.Time) { - fmt.Printf("%s > [%s] Clock.ConfirmedTimeUpdated: %s [Slot %d]\n", n.Name, engineName, newTime, instance.LatestAPI().TimeProvider().SlotFromTime(newTime)) + instance.LogTrace("Clock.ConfirmedTimeUpdated", "time", newTime, "slot", instance.LatestAPI().TimeProvider().SlotFromTime(newTime)) }) events.PreSolidFilter.BlockPreAllowed.Hook(func(block *model.Block) { - fmt.Printf("%s > [%s] PreSolidFilter.BlockPreAllowed: %s\n", n.Name, engineName, block.ID()) + instance.LogTrace("PreSolidFilter.BlockPreAllowed", "block", block.ID()) }) events.PreSolidFilter.BlockPreFiltered.Hook(func(event *presolidfilter.BlockPreFilteredEvent) { - fmt.Printf("%s > [%s] PreSolidFilter.BlockPreFiltered: %s - %s\n", n.Name, engineName, event.Block.ID(), event.Reason.Error()) + instance.LogTrace("PreSolidFilter.BlockPreFiltered", "block", event.Block.ID(), "err", event.Reason) + if failOnBlockFiltered { n.Testing.Fatal("no blocks should be prefiltered") } }) events.PostSolidFilter.BlockAllowed.Hook(func(block *blocks.Block) { - fmt.Printf("%s > [%s] PostSolidFilter.BlockAllowed: %s\n", n.Name, engineName, block.ID()) + instance.LogTrace("PostSolidFilter.BlockAllowed", "block", block.ID()) }) events.PostSolidFilter.BlockFiltered.Hook(func(event *postsolidfilter.BlockFilteredEvent) { - fmt.Printf("%s > [%s] PostSolidFilter.BlockFiltered: %s - %s\n", n.Name, engineName, event.Block.ID(), event.Reason.Error()) + instance.LogTrace("PostSolidFilter.BlockFiltered", "block", event.Block.ID(), "err", event.Reason) + if failOnBlockFiltered { n.Testing.Fatal("no blocks should be filtered") } @@ -319,11 +321,11 @@ func (n *Node) attachEngineLogsWithName(failOnBlockFiltered bool, instance *engi }) events.BlockRequester.Tick.Hook(func(blockID iotago.BlockID) { - fmt.Printf("%s > [%s] BlockRequester.Tick: %s\n", n.Name, engineName, blockID) + instance.LogTrace("BlockRequester.Tick", "block", blockID) }) events.BlockProcessed.Hook(func(blockID iotago.BlockID) { - fmt.Printf("%s > [%s] Engine.BlockProcessed: %s\n", n.Name, engineName, blockID) + instance.LogTrace("BlockProcessed", "block", blockID) }) events.Notarization.SlotCommitted.Hook(func(details *notarization.SlotCommittedDetails) { @@ -350,117 +352,116 @@ func (n *Node) attachEngineLogsWithName(failOnBlockFiltered bool, instance *engi require.NoError(n.Testing, err) } - fmt.Printf("%s > [%s] NotarizationManager.SlotCommitted: %s %s Accepted Blocks: %s\n %s\n Attestations: %s\n", n.Name, engineName, details.Commitment.ID(), details.Commitment, acceptedBlocks, roots, attestationBlockIDs) + instance.LogTrace("NotarizationManager.SlotCommitted", "commitment", details.Commitment.ID(), "acceptedBlocks", acceptedBlocks, "roots", roots, "attestations", attestationBlockIDs) }) events.Notarization.LatestCommitmentUpdated.Hook(func(commitment *model.Commitment) { - fmt.Printf("%s > [%s] NotarizationManager.LatestCommitmentUpdated: %s\n", n.Name, engineName, commitment.ID()) + instance.LogTrace("NotarizationManager.LatestCommitmentUpdated", "commitment", commitment.ID()) }) events.BlockGadget.BlockPreAccepted.Hook(func(block *blocks.Block) { - fmt.Printf("%s > [%s] Consensus.BlockGadget.BlockPreAccepted: %s %s\n", n.Name, engineName, block.ID(), block.ProtocolBlock().Header.SlotCommitmentID) + instance.LogTrace("BlockGadget.BlockPreAccepted", "block", block.ID(), "slotCommitmentID", block.ProtocolBlock().Header.SlotCommitmentID) }) events.BlockGadget.BlockAccepted.Hook(func(block *blocks.Block) { - fmt.Printf("%s > [%s] Consensus.BlockGadget.BlockAccepted: %s @ slot %s committing to %s\n", n.Name, engineName, block.ID(), block.ID().Slot(), block.ProtocolBlock().Header.SlotCommitmentID) + instance.LogTrace("BlockGadget.BlockAccepted", "block", block.ID(), "slotCommitmentID", block.ProtocolBlock().Header.SlotCommitmentID) }) events.BlockGadget.BlockPreConfirmed.Hook(func(block *blocks.Block) { - fmt.Printf("%s > [%s] Consensus.BlockGadget.BlockPreConfirmed: %s %s\n", n.Name, engineName, block.ID(), block.ProtocolBlock().Header.SlotCommitmentID) + instance.LogTrace("BlockGadget.BlockPreConfirmed", "block", block.ID(), "slotCommitmentID", block.ProtocolBlock().Header.SlotCommitmentID) }) events.BlockGadget.BlockConfirmed.Hook(func(block *blocks.Block) { - fmt.Printf("%s > [%s] Consensus.BlockGadget.BlockConfirmed: %s %s\n", n.Name, engineName, block.ID(), block.ProtocolBlock().Header.SlotCommitmentID) + instance.LogTrace("BlockGadget.BlockConfirmed", "block", block.ID(), "slotCommitmentID", block.ProtocolBlock().Header.SlotCommitmentID) }) events.SlotGadget.SlotFinalized.Hook(func(slot iotago.SlotIndex) { - fmt.Printf("%s > [%s] Consensus.SlotGadget.SlotFinalized: %s\n", n.Name, engineName, slot) + instance.LogTrace("SlotGadget.SlotFinalized", "slot", slot) }) events.SeatManager.OnlineCommitteeSeatAdded.Hook(func(seat account.SeatIndex, accountID iotago.AccountID) { - fmt.Printf("%s > [%s] SybilProtection.OnlineCommitteeSeatAdded: %d - %s\n", n.Name, engineName, seat, accountID) + instance.LogTrace("SybilProtection.OnlineCommitteeSeatAdded", "seat", seat, "accountID", accountID) }) events.SeatManager.OnlineCommitteeSeatRemoved.Hook(func(seat account.SeatIndex) { - fmt.Printf("%s > [%s] SybilProtection.OnlineCommitteeSeatRemoved: %d\n", n.Name, engineName, seat) + instance.LogTrace("SybilProtection.OnlineCommitteeSeatRemoved", "seat", seat) }) events.SybilProtection.CommitteeSelected.Hook(func(committee *account.Accounts, epoch iotago.EpochIndex) { - fmt.Printf("%s > [%s] SybilProtection.CommitteeSelected: epoch %d - %s\n", n.Name, engineName, epoch, committee.IDs()) + instance.LogTrace("SybilProtection.CommitteeSelected", "epoch", epoch, "committee", committee.IDs()) }) - events.SpendDAG.SpenderCreated.Hook(func(spenderID iotago.TransactionID) { - fmt.Printf("%s > [%s] SpendDAG.SpendCreated: %s\n", n.Name, engineName, spenderID) + events.SpendDAG.SpenderCreated.Hook(func(conflictID iotago.TransactionID) { + instance.LogTrace("SpendDAG.SpenderCreated", "conflictID", conflictID) }) - events.SpendDAG.SpenderEvicted.Hook(func(spenderID iotago.TransactionID) { - fmt.Printf("%s > [%s] SpendDAG.SpendEvicted: %s\n", n.Name, engineName, spenderID) + events.SpendDAG.SpenderEvicted.Hook(func(conflictID iotago.TransactionID) { + instance.LogTrace("SpendDAG.SpenderEvicted", "conflictID", conflictID) }) - events.SpendDAG.SpenderRejected.Hook(func(spenderID iotago.TransactionID) { - fmt.Printf("%s > [%s] SpendDAG.SpendRejected: %s\n", n.Name, engineName, spenderID) + + events.SpendDAG.SpenderRejected.Hook(func(conflictID iotago.TransactionID) { + instance.LogTrace("SpendDAG.SpenderRejected", "conflictID", conflictID) }) - events.SpendDAG.SpenderAccepted.Hook(func(spenderID iotago.TransactionID) { - fmt.Printf("%s > [%s] SpendDAG.SpendAccepted: %s\n", n.Name, engineName, spenderID) + events.SpendDAG.SpenderAccepted.Hook(func(conflictID iotago.TransactionID) { + instance.LogTrace("SpendDAG.SpenderAccepted", "conflictID", conflictID) }) instance.Ledger.MemPool().OnSignedTransactionAttached( func(signedTransactionMetadata mempool.SignedTransactionMetadata) { signedTransactionMetadata.OnSignaturesInvalid(func(err error) { - fmt.Printf("%s > [%s] MemPool.SignedTransactionSignaturesInvalid(%s): %s\n", n.Name, engineName, err, signedTransactionMetadata.ID()) + instance.LogTrace("MemPool.SignedTransactionSignaturesInvalid", "tx", signedTransactionMetadata.ID(), "err", err) }) }, ) instance.Ledger.OnTransactionAttached(func(transactionMetadata mempool.TransactionMetadata) { - fmt.Printf("%s > [%s] Ledger.TransactionAttached: %s\n", n.Name, engineName, transactionMetadata.ID()) + instance.LogTrace("Ledger.TransactionAttached", "tx", transactionMetadata.ID()) transactionMetadata.OnSolid(func() { - fmt.Printf("%s > [%s] MemPool.TransactionSolid: %s\n", n.Name, engineName, transactionMetadata.ID()) + instance.LogTrace("MemPool.TransactionSolid", "tx", transactionMetadata.ID()) }) transactionMetadata.OnExecuted(func() { - fmt.Printf("%s > [%s] MemPool.TransactionExecuted: %s\n", n.Name, engineName, transactionMetadata.ID()) + instance.LogTrace("MemPool.TransactionExecuted", "tx", transactionMetadata.ID()) }) transactionMetadata.OnBooked(func() { - fmt.Printf("%s > [%s] MemPool.TransactionBooked: %s\n", n.Name, engineName, transactionMetadata.ID()) + instance.LogTrace("MemPool.TransactionBooked", "tx", transactionMetadata.ID()) }) transactionMetadata.OnConflicting(func() { - fmt.Printf("%s > [%s] MemPool.TransactionConflicting: %s\n", n.Name, engineName, transactionMetadata.ID()) + instance.LogTrace("MemPool.TransactionConflicting", "tx", transactionMetadata.ID()) }) transactionMetadata.OnAccepted(func() { - fmt.Printf("%s > [%s] MemPool.TransactionAccepted: %s\n", n.Name, engineName, transactionMetadata.ID()) + instance.LogTrace("MemPool.TransactionAccepted", "tx", transactionMetadata.ID()) }) transactionMetadata.OnRejected(func() { - fmt.Printf("%s > [%s] MemPool.TransactionRejected: %s\n", n.Name, engineName, transactionMetadata.ID()) + instance.LogTrace("MemPool.TransactionRejected", "tx", transactionMetadata.ID()) }) transactionMetadata.OnInvalid(func(err error) { - fmt.Printf("%s > [%s] MemPool.TransactionInvalid(%s): %s\n", n.Name, engineName, err, transactionMetadata.ID()) + instance.LogTrace("MemPool.TransactionInvalid", "tx", transactionMetadata.ID(), "err", err) }) transactionMetadata.OnOrphanedSlotUpdated(func(slot iotago.SlotIndex) { - fmt.Printf("%s > [%s] MemPool.TransactionOrphanedSlotUpdated in slot %d: %s\n", n.Name, engineName, slot, transactionMetadata.ID()) + instance.LogTrace("MemPool.TransactionOrphanedSlotUpdated", "tx", transactionMetadata.ID(), "slot", slot) }) transactionMetadata.OnCommittedSlotUpdated(func(slot iotago.SlotIndex) { - fmt.Printf("%s > [%s] MemPool.TransactionCommittedSlotUpdated in slot %d: %s\n", n.Name, engineName, slot, transactionMetadata.ID()) + instance.LogTrace("MemPool.TransactionCommittedSlotUpdated", "tx", transactionMetadata.ID(), "slot", slot) }) transactionMetadata.OnPending(func() { - fmt.Printf("%s > [%s] MemPool.TransactionPending: %s\n", n.Name, engineName, transactionMetadata.ID()) + instance.LogTrace("MemPool.TransactionPending", "tx", transactionMetadata.ID()) }) }) } func (n *Node) attachEngineLogs(failOnBlockFiltered bool, instance *engine.Engine) { - engineName := fmt.Sprintf("%s - %s", lo.Cond(n.Protocol.Engines.Main.Get() != instance, "Candidate", "Main"), instance.Name()[:8]) - - n.attachEngineLogsWithName(failOnBlockFiltered, instance, engineName) + n.attachEngineLogsWithName(failOnBlockFiltered, instance) } func (n *Node) Wait() { diff --git a/pkg/testsuite/storage_settings.go b/pkg/testsuite/storage_settings.go index 956797be7..04409b25b 100644 --- a/pkg/testsuite/storage_settings.go +++ b/pkg/testsuite/storage_settings.go @@ -68,8 +68,15 @@ func (t *TestSuite) AssertCommitmentSlotIndexExists(slot iotago.SlotIndex, nodes return ierrors.Errorf("AssertCommitmentSlotIndexExists: %s: commitment at index %v not found", node.Name, slot) } + // Make sure the main chain exists + mainChain := node.Protocol.Chains.Main.Get() + if mainChain == nil { + return ierrors.Errorf("AssertCommitmentSlotIndexExists: %s: main chain not found when checking for commitment at index %v", node.Name, slot) + } + // Make sure the commitment is also available in the ChainManager. - if node.Protocol.Chains.Main.Get().LatestCommitment.Get().ID().Slot() < slot { + latestCommitment := mainChain.LatestCommitment.Get() + if latestCommitment == nil || latestCommitment.ID().Slot() < slot { return ierrors.Errorf("AssertCommitmentSlotIndexExists: %s: commitment at index %v not found in ChainManager", node.Name, slot) } @@ -126,8 +133,12 @@ func (t *TestSuite) AssertChainID(expectedChainID iotago.CommitmentID, nodes ... for _, node := range nodes { t.Eventually(func() error { - actualChainID := node.Protocol.Chains.Main.Get().ForkingPoint.Get().ID() + mainChain := node.Protocol.Chains.Main.Get() + if mainChain == nil { + return ierrors.Errorf("AssertChainID: %s: main chain not found", node.Name) + } + actualChainID := mainChain.ForkingPoint.Get().ID() if expectedChainID != actualChainID { fmt.Println(expectedChainID, actualChainID)