Skip to content

Commit

Permalink
Fix bugs in WarpSync logic (#577)
Browse files Browse the repository at this point in the history
fixed bugs in warpsync logic and optimized logging during debug
  • Loading branch information
hmoog authored Dec 3, 2023
1 parent 7243706 commit af324c3
Show file tree
Hide file tree
Showing 9 changed files with 191 additions and 205 deletions.
2 changes: 1 addition & 1 deletion pkg/core/buffer/unsolid_commitment_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (u *UnsolidCommitmentBuffer[V]) Add(commitmentID iotago.CommitmentID, value
u.mutex.RLock()
defer u.mutex.RUnlock()

if commitmentID.Slot() <= u.lastEvictedSlot {
if u.lastEvictedSlot != 0 && commitmentID.Slot() <= u.lastEvictedSlot {
return false
}

Expand Down
182 changes: 72 additions & 110 deletions pkg/protocol/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ type Chain struct {
// LatestAttestedCommitment contains the latest commitment of this chain for which attestations were received.
LatestAttestedCommitment reactive.Variable[*Commitment]

// LatestFullyBookedCommitment contains the latest commitment of this chain for which all blocks were booked.
LatestFullyBookedCommitment reactive.Variable[*Commitment]

// LatestProducedCommitment contains the latest commitment of this chain that we produced ourselves by booking the
// corresponding blocks in the Engine.
LatestProducedCommitment reactive.Variable[*Commitment]
Expand All @@ -45,12 +48,8 @@ type Chain struct {
// latest verified commitment.
VerifiedWeight reactive.Variable[uint64]

// WarpSyncMode contains a flag that indicates whether this chain is in warp sync mode.
WarpSyncMode reactive.Variable[bool]

// WarpSyncThreshold contains the slot at which the chain will exit warp sync mode which is derived from the latest
// network slot minus the max committable age.
WarpSyncThreshold reactive.Variable[iotago.SlotIndex]
// WarpSyncModeEnabled contains a flag that indicates whether this chain is in warp sync mode.
WarpSyncModeEnabled reactive.Variable[bool]

// OutOfSyncThreshold contains the slot at which the chain will consider itself to be out of sync and switch to warp
// sync mode. It is derived from the latest network slot minus two times the max committable age.
Expand Down Expand Up @@ -83,22 +82,22 @@ type Chain struct {
// newChain creates a new chain instance.
func newChain(chains *Chains) *Chain {
c := &Chain{
ForkingPoint: reactive.NewVariable[*Commitment](),
ParentChain: reactive.NewVariable[*Chain](),
ChildChains: reactive.NewSet[*Chain](),
LatestCommitment: reactive.NewVariable[*Commitment](),
LatestAttestedCommitment: reactive.NewVariable[*Commitment](),
LatestProducedCommitment: reactive.NewVariable[*Commitment](),
ClaimedWeight: reactive.NewVariable[uint64](),
AttestedWeight: reactive.NewVariable[uint64](),
VerifiedWeight: reactive.NewVariable[uint64](),
WarpSyncMode: reactive.NewVariable[bool](),
WarpSyncThreshold: reactive.NewVariable[iotago.SlotIndex](),
OutOfSyncThreshold: reactive.NewVariable[iotago.SlotIndex](),
RequestAttestations: reactive.NewVariable[bool](),
StartEngine: reactive.NewVariable[bool](),
Engine: reactive.NewVariable[*engine.Engine](),
IsEvicted: reactive.NewEvent(),
ForkingPoint: reactive.NewVariable[*Commitment](),
ParentChain: reactive.NewVariable[*Chain](),
ChildChains: reactive.NewSet[*Chain](),
LatestCommitment: reactive.NewVariable[*Commitment](),
LatestAttestedCommitment: reactive.NewVariable[*Commitment](),
LatestFullyBookedCommitment: reactive.NewVariable[*Commitment](),
LatestProducedCommitment: reactive.NewVariable[*Commitment](),
ClaimedWeight: reactive.NewVariable[uint64](),
AttestedWeight: reactive.NewVariable[uint64](),
VerifiedWeight: reactive.NewVariable[uint64](),
WarpSyncModeEnabled: reactive.NewVariable[bool]().Init(true),
OutOfSyncThreshold: reactive.NewVariable[iotago.SlotIndex](),
RequestAttestations: reactive.NewVariable[bool](),
StartEngine: reactive.NewVariable[bool](),
Engine: reactive.NewVariable[*engine.Engine](),
IsEvicted: reactive.NewEvent(),

chains: chains,
commitments: shrinkingmap.New[iotago.SlotIndex, *Commitment](),
Expand Down Expand Up @@ -157,6 +156,8 @@ func (c *Chain) DispatchBlock(block *model.Block, src peer.ID) (dispatched bool)
func (c *Chain) Commitment(slot iotago.SlotIndex) (commitment *Commitment, exists bool) {
for currentChain := c; currentChain != nil; {
switch forkingPoint := currentChain.ForkingPoint.Get(); {
case forkingPoint == nil:
return nil, false
case forkingPoint.Slot() == slot:
return forkingPoint, true
case slot > forkingPoint.Slot():
Expand Down Expand Up @@ -186,14 +187,15 @@ func (c *Chain) initLogger() (shutdown func()) {
c.Logger, shutdown = c.chains.NewEntityLogger("")

return lo.Batch(
c.WarpSyncMode.LogUpdates(c, log.LevelTrace, "WarpSyncMode"),
c.WarpSyncThreshold.LogUpdates(c, log.LevelTrace, "WarpSyncThreshold"),
c.WarpSyncModeEnabled.LogUpdates(c, log.LevelTrace, "WarpSyncModeEnabled"),
c.OutOfSyncThreshold.LogUpdates(c, log.LevelTrace, "OutOfSyncThreshold"),
c.ForkingPoint.LogUpdates(c, log.LevelTrace, "ForkingPoint", (*Commitment).LogName),
c.ClaimedWeight.LogUpdates(c, log.LevelTrace, "ClaimedWeight"),
c.AttestedWeight.LogUpdates(c, log.LevelTrace, "AttestedWeight"),
c.VerifiedWeight.LogUpdates(c, log.LevelTrace, "VerifiedWeight"),
c.LatestCommitment.LogUpdates(c, log.LevelTrace, "LatestCommitment", (*Commitment).LogName),
c.LatestAttestedCommitment.LogUpdates(c, log.LevelTrace, "LatestAttestedCommitment", (*Commitment).LogName),
c.LatestFullyBookedCommitment.LogUpdates(c, log.LevelTrace, "LatestFullyBookedCommitment", (*Commitment).LogName),
c.LatestProducedCommitment.LogUpdates(c, log.LevelDebug, "LatestProducedCommitment", (*Commitment).LogName),
c.RequestAttestations.LogUpdates(c, log.LevelTrace, "RequestAttestations"),
c.StartEngine.LogUpdates(c, log.LevelDebug, "StartEngine"),
Expand All @@ -207,10 +209,21 @@ func (c *Chain) initLogger() (shutdown func()) {
// initDerivedProperties initializes the behavior of this chain by setting up the relations between its properties.
func (c *Chain) initDerivedProperties() (shutdown func()) {
return lo.Batch(
c.deriveClaimedWeight(),
c.deriveVerifiedWeight(),
c.deriveLatestAttestedWeight(),
c.deriveWarpSyncMode(),
c.ClaimedWeight.DeriveValueFrom(reactive.NewDerivedVariable(func(_ uint64, latestCommitment *Commitment) uint64 {
return latestCommitment.cumulativeWeight()
}, c.LatestCommitment)),

c.VerifiedWeight.DeriveValueFrom(reactive.NewDerivedVariable(func(_ uint64, latestProducedCommitment *Commitment) uint64 {
return latestProducedCommitment.cumulativeWeight()
}, c.LatestProducedCommitment)),

c.WarpSyncModeEnabled.DeriveValueFrom(reactive.NewDerivedVariable3(func(enabled bool, latestFullyBookedCommitment *Commitment, latestSeenSlot iotago.SlotIndex, outOfSyncThreshold iotago.SlotIndex) bool {
return warpSyncModeEnabled(enabled, latestFullyBookedCommitment, latestSeenSlot, outOfSyncThreshold)
}, c.LatestFullyBookedCommitment, c.chains.LatestSeenSlot, c.OutOfSyncThreshold, c.WarpSyncModeEnabled.Get())),

c.LatestAttestedCommitment.WithNonEmptyValue(func(latestAttestedCommitment *Commitment) (shutdown func()) {
return c.AttestedWeight.InheritFrom(latestAttestedCommitment.CumulativeAttestedWeight)
}),

c.ForkingPoint.WithValue(func(forkingPoint *Commitment) (shutdown func()) {
return c.deriveParentChain(forkingPoint)
Expand All @@ -221,66 +234,13 @@ func (c *Chain) initDerivedProperties() (shutdown func()) {
}),

c.Engine.WithNonEmptyValue(func(engineInstance *engine.Engine) (shutdown func()) {
return lo.Batch(
c.deriveWarpSyncThreshold(c.chains.LatestSeenSlot, engineInstance),
c.deriveOutOfSyncThreshold(c.chains.LatestSeenSlot, engineInstance),
)
return c.OutOfSyncThreshold.DeriveValueFrom(reactive.NewDerivedVariable(func(_ iotago.SlotIndex, latestSeenSlot iotago.SlotIndex) iotago.SlotIndex {
return outOfSyncThreshold(engineInstance, latestSeenSlot)
}, c.chains.LatestSeenSlot))
}),
)
}

// deriveWarpSyncMode defines how a chain determines whether it is in warp sync mode or not.
func (c *Chain) deriveWarpSyncMode() func() {
return c.WarpSyncMode.DeriveValueFrom(reactive.NewDerivedVariable3(func(warpSyncMode bool, latestProducedCommitment *Commitment, warpSyncThreshold iotago.SlotIndex, outOfSyncThreshold iotago.SlotIndex) bool {
// latest produced commitment is nil if we have not produced any commitment yet (intermediary state during
// startup)
if latestProducedCommitment == nil {
return warpSyncMode
}

// if warp sync mode is enabled, keep it enabled until we are no longer below the warp sync threshold
if warpSyncMode {
return latestProducedCommitment.ID().Slot() < warpSyncThreshold
}

// if warp sync mode is disabled, enable it only if we fall below the out of sync threshold
return latestProducedCommitment.ID().Slot() < outOfSyncThreshold
}, c.LatestProducedCommitment, c.WarpSyncThreshold, c.OutOfSyncThreshold, c.WarpSyncMode.Get()))
}

// deriveClaimedWeight defines how a chain determines its claimed weight (by setting the cumulative weight of the
// latest commitment).
func (c *Chain) deriveClaimedWeight() (shutdown func()) {
return c.ClaimedWeight.DeriveValueFrom(reactive.NewDerivedVariable(func(_ uint64, latestCommitment *Commitment) uint64 {
if latestCommitment == nil {
return 0
}

return latestCommitment.CumulativeWeight()
}, c.LatestCommitment))
}

// deriveLatestAttestedWeight defines how a chain determines its attested weight (by inheriting the cumulative attested
// weight of the latest attested commitment). It uses inheritance instead of simply setting the value as the cumulative
// attested weight can change over time depending on the attestations that are received.
func (c *Chain) deriveLatestAttestedWeight() func() {
return c.LatestAttestedCommitment.WithNonEmptyValue(func(latestAttestedCommitment *Commitment) (shutdown func()) {
return c.AttestedWeight.InheritFrom(latestAttestedCommitment.CumulativeAttestedWeight)
})
}

// deriveVerifiedWeight defines how a chain determines its verified weight (by setting the cumulative weight of the
// latest produced commitment).
func (c *Chain) deriveVerifiedWeight() func() {
return c.VerifiedWeight.DeriveValueFrom(reactive.NewDerivedVariable(func(_ uint64, latestProducedCommitment *Commitment) uint64 {
if latestProducedCommitment == nil {
return 0
}

return latestProducedCommitment.CumulativeWeight()
}, c.LatestProducedCommitment))
}

// deriveChildChains defines how a chain determines its ChildChains (by adding each child to the set).
func (c *Chain) deriveChildChains(child *Chain) func() {
c.ChildChains.Add(child)
Expand Down Expand Up @@ -310,31 +270,6 @@ func (c *Chain) deriveParentChain(forkingPoint *Commitment) (shutdown func()) {
return nil
}

// deriveOutOfSyncThreshold defines how a chain determines its "out of sync" threshold (the latest seen slot minus 2
// times the max committable age or 0 if this would cause an overflow to the negative numbers).
func (c *Chain) deriveOutOfSyncThreshold(latestSeenSlot reactive.ReadableVariable[iotago.SlotIndex], engineInstance *engine.Engine) func() {
return c.OutOfSyncThreshold.DeriveValueFrom(reactive.NewDerivedVariable(func(_ iotago.SlotIndex, latestSeenSlot iotago.SlotIndex) iotago.SlotIndex {
if outOfSyncOffset := 2 * engineInstance.LatestAPI().ProtocolParameters().MaxCommittableAge(); outOfSyncOffset < latestSeenSlot {
return latestSeenSlot - outOfSyncOffset
}

return 0
}, latestSeenSlot))
}

// deriveWarpSyncThreshold defines how a chain determines its warp sync threshold (the latest seen slot minus the max
// committable age or 0 if this would cause an overflow to the negative numbers).
func (c *Chain) deriveWarpSyncThreshold(latestSeenSlot reactive.ReadableVariable[iotago.SlotIndex], engineInstance *engine.Engine) func() {
return c.WarpSyncThreshold.DeriveValueFrom(reactive.NewDerivedVariable(func(_ iotago.SlotIndex, latestSeenSlot iotago.SlotIndex) iotago.SlotIndex {
warpSyncOffset := engineInstance.LatestAPI().ProtocolParameters().MaxCommittableAge()
if warpSyncOffset < latestSeenSlot {
return latestSeenSlot - warpSyncOffset
}

return 0
}, latestSeenSlot))
}

// addCommitment adds the given commitment to this chain.
func (c *Chain) addCommitment(newCommitment *Commitment) (shutdown func()) {
c.commitments.Set(newCommitment.Slot(), newCommitment)
Expand All @@ -343,6 +278,7 @@ func (c *Chain) addCommitment(newCommitment *Commitment) (shutdown func()) {

return lo.Batch(
newCommitment.IsAttested.OnTrigger(func() { c.LatestAttestedCommitment.Set(newCommitment) }),
newCommitment.IsFullyBooked.OnTrigger(func() { c.LatestFullyBookedCommitment.Set(newCommitment) }),
newCommitment.IsCommitted.OnTrigger(func() { c.LatestProducedCommitment.Set(newCommitment) }),
)
}
Expand All @@ -363,7 +299,7 @@ func (c *Chain) dispatchBlockToSpawnedEngine(block *model.Block, src peer.ID) (d
}

// perform additional checks if we are in warp sync mode (only let blocks pass that we requested)
if c.WarpSyncMode.Get() {
if c.WarpSyncModeEnabled.Get() {
// abort if the target commitment does not exist
targetCommitment, targetCommitmentExists := c.Commitment(targetSlot)
if !targetCommitmentExists {
Expand Down Expand Up @@ -400,3 +336,29 @@ func (c *Chain) verifiedWeight() reactive.Variable[uint64] {
func (c *Chain) attestedWeight() reactive.Variable[uint64] {
return c.AttestedWeight
}

// outOfSyncThreshold returns the slot index at which the node is considered out of sync.
func outOfSyncThreshold(engineInstance *engine.Engine, latestSeenSlot iotago.SlotIndex) iotago.SlotIndex {
if outOfSyncOffset := 2 * engineInstance.LatestAPI().ProtocolParameters().MaxCommittableAge(); outOfSyncOffset < latestSeenSlot {
return latestSeenSlot - outOfSyncOffset
}

return 0
}

// warpSyncModeEnabled determines whether warp sync mode should be enabled or not.
func warpSyncModeEnabled(enabled bool, latestFullyBookedCommitment *Commitment, latestSeenSlot iotago.SlotIndex, outOfSyncThreshold iotago.SlotIndex) bool {
// latest produced commitment is nil if we have not produced any commitment yet (intermediary state during
// startup)
if latestFullyBookedCommitment == nil {
return enabled
}

// if warp sync mode is enabled, keep it enabled until we are no longer below the warp sync threshold
if enabled {
return latestFullyBookedCommitment.ID().Slot() < latestSeenSlot
}

// if warp sync mode is disabled, enable it only if we fall below the out of sync threshold
return latestFullyBookedCommitment.ID().Slot() < outOfSyncThreshold
}
25 changes: 17 additions & 8 deletions pkg/protocol/commitment.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,11 @@ func (c *Commitment) initDerivedProperties() (shutdown func()) {
return lo.Batch(
// mark commitments that are marked as root as verified
c.IsCommitted.InheritFrom(c.IsRoot),
c.ReplayDroppedBlocks.InheritFrom(c.IsRoot),

// mark commitments that are marked as verified as attested, fully booked and committable
// mark commitments that are marked as verified as attested and fully booked
c.IsAttested.InheritFrom(c.IsCommitted),
c.IsFullyBooked.InheritFrom(c.IsCommitted),
c.IsCommittable.InheritFrom(c.IsCommitted),

c.Parent.WithNonEmptyValue(func(parent *Commitment) func() {
// the weight can be fixed as a one time operation (as it only relies on static information from the parent
Expand Down Expand Up @@ -274,17 +274,17 @@ func (c *Commitment) deriveRequestAttestations(chain *Chain, parent *Commitment)
// deriveWarpSyncBlocks derives the WarpSyncBlocks flag of this Commitment which is true if our Chain is requesting
// warp sync, and we are the directly above the latest verified Commitment.
func (c *Commitment) deriveWarpSyncBlocks(chain *Chain, parent *Commitment) func() {
return c.WarpSyncBlocks.DeriveValueFrom(reactive.NewDerivedVariable4(func(_ bool, engineInstance *engine.Engine, warpSync bool, parentIsFullyBooked bool, isFullyBooked bool) bool {
return engineInstance != nil && warpSync && parentIsFullyBooked && !isFullyBooked
}, chain.Engine, chain.WarpSyncMode, parent.IsFullyBooked, c.IsFullyBooked))
return c.WarpSyncBlocks.DeriveValueFrom(reactive.NewDerivedVariable4(func(_ bool, engineInstance *engine.Engine, warpSyncModeEnabled bool, parentIsFullyBooked bool, isFullyBooked bool) bool {
return engineInstance != nil && warpSyncModeEnabled && parentIsFullyBooked && !isFullyBooked
}, chain.Engine, chain.WarpSyncModeEnabled, parent.IsFullyBooked, c.IsFullyBooked))
}

// deriveReplayDroppedBlocks derives the ReplayDroppedBlocks flag of this Commitment which is true if our Chain has an
// engine, is no longer requesting warp sync, and we are above the latest verified Commitment.
func (c *Commitment) deriveReplayDroppedBlocks(chain *Chain) func() {
return c.ReplayDroppedBlocks.DeriveValueFrom(reactive.NewDerivedVariable3(func(_ bool, engineInstance *engine.Engine, warpSyncing bool, isAboveLatestVerifiedCommitment bool) bool {
return engineInstance != nil && !warpSyncing && isAboveLatestVerifiedCommitment
}, chain.Engine, chain.WarpSyncMode, c.IsAboveLatestVerifiedCommitment))
return c.ReplayDroppedBlocks.DeriveValueFrom(reactive.NewDerivedVariable3(func(_ bool, engineInstance *engine.Engine, warpSyncModeEnabled bool, isAboveLatestVerifiedCommitment bool) bool {
return engineInstance != nil && !warpSyncModeEnabled && isAboveLatestVerifiedCommitment
}, chain.Engine, chain.WarpSyncModeEnabled, c.IsAboveLatestVerifiedCommitment))
}

// forceChain forces the Chain of this Commitment to the given Chain by promoting it to the main child of its parent if
Expand All @@ -296,3 +296,12 @@ func (c *Commitment) forceChain(targetChain *Chain) {
}
}
}

// cumulativeWeight returns the cumulative weight of this Commitment while gracefully handling nil receivers.
func (c *Commitment) cumulativeWeight() uint64 {
if c == nil {
return 0
}

return c.CumulativeWeight()
}
9 changes: 9 additions & 0 deletions pkg/protocol/commitment_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/iotaledger/hive.go/ds"
"github.com/iotaledger/hive.go/ierrors"
"github.com/iotaledger/hive.go/kvstore/mapdb"
"github.com/iotaledger/hive.go/runtime/syncutils"
"github.com/iotaledger/iota-core/pkg/model"
"github.com/iotaledger/iota-core/pkg/protocol/engine"
"github.com/iotaledger/iota-core/pkg/protocol/engine/accounts"
Expand All @@ -22,6 +23,9 @@ type CommitmentVerifier struct {
// validatorAccountsData is the accounts data of the validators for the current epoch as known at lastCommonSlotBeforeFork.
// Initially, it is set to the accounts data of the validators for the epoch of the last common commitment before the fork.
validatorAccountsData map[iotago.AccountID]*accounts.AccountData

// mutex is used to synchronize access to validatorAccountsData and epoch.
mutex syncutils.RWMutex
}

func newCommitmentVerifier(mainEngine *engine.Engine, lastCommonCommitmentBeforeFork *model.Commitment) (*CommitmentVerifier, error) {
Expand Down Expand Up @@ -76,6 +80,7 @@ func (c *CommitmentVerifier) verifyCommitment(commitment *Commitment, attestatio
// This is necessary because the committee might have rotated at the epoch boundary and different validators might be part of it.
// In case anything goes wrong we keep using previously known accounts data (initially set to the accounts data
// of the validators for the epoch of the last common commitment before the fork).
c.mutex.Lock()
apiForSlot := c.engine.APIForSlot(commitment.Slot())
commitmentEpoch := apiForSlot.TimeProvider().EpochFromSlot(commitment.Slot())
if commitmentEpoch > c.epoch {
Expand All @@ -92,6 +97,7 @@ func (c *CommitmentVerifier) verifyCommitment(commitment *Commitment, attestatio
}
}
}
c.mutex.Unlock()

// 3. Verify attestations.
blockIDs, seatCount, err := c.verifyAttestations(attestations)
Expand All @@ -107,6 +113,9 @@ func (c *CommitmentVerifier) verifyCommitment(commitment *Commitment, attestatio
}

func (c *CommitmentVerifier) verifyAttestations(attestations []*iotago.Attestation) (iotago.BlockIDs, uint64, error) {
c.mutex.RLock()
defer c.mutex.RUnlock()

visitedIdentities := ds.NewSet[iotago.AccountID]()
var blockIDs iotago.BlockIDs
var seatCount uint64
Expand Down
Loading

0 comments on commit af324c3

Please sign in to comment.