Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bugs in WarpSync logic #577

Merged
merged 17 commits into from
Dec 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/core/buffer/unsolid_commitment_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (u *UnsolidCommitmentBuffer[V]) Add(commitmentID iotago.CommitmentID, value
u.mutex.RLock()
defer u.mutex.RUnlock()

if commitmentID.Slot() <= u.lastEvictedSlot {
if u.lastEvictedSlot != 0 && commitmentID.Slot() <= u.lastEvictedSlot {
return false
}

Expand Down
182 changes: 72 additions & 110 deletions pkg/protocol/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ type Chain struct {
// LatestAttestedCommitment contains the latest commitment of this chain for which attestations were received.
LatestAttestedCommitment reactive.Variable[*Commitment]

// LatestFullyBookedCommitment contains the latest commitment of this chain for which all blocks were booked.
LatestFullyBookedCommitment reactive.Variable[*Commitment]

// LatestProducedCommitment contains the latest commitment of this chain that we produced ourselves by booking the
// corresponding blocks in the Engine.
LatestProducedCommitment reactive.Variable[*Commitment]
Expand All @@ -45,12 +48,8 @@ type Chain struct {
// latest verified commitment.
VerifiedWeight reactive.Variable[uint64]

// WarpSyncMode contains a flag that indicates whether this chain is in warp sync mode.
WarpSyncMode reactive.Variable[bool]

// WarpSyncThreshold contains the slot at which the chain will exit warp sync mode which is derived from the latest
// network slot minus the max committable age.
WarpSyncThreshold reactive.Variable[iotago.SlotIndex]
// WarpSyncModeEnabled contains a flag that indicates whether this chain is in warp sync mode.
WarpSyncModeEnabled reactive.Variable[bool]

// OutOfSyncThreshold contains the slot at which the chain will consider itself to be out of sync and switch to warp
// sync mode. It is derived from the latest network slot minus two times the max committable age.
Expand Down Expand Up @@ -83,22 +82,22 @@ type Chain struct {
// newChain creates a new chain instance.
func newChain(chains *Chains) *Chain {
c := &Chain{
ForkingPoint: reactive.NewVariable[*Commitment](),
ParentChain: reactive.NewVariable[*Chain](),
ChildChains: reactive.NewSet[*Chain](),
LatestCommitment: reactive.NewVariable[*Commitment](),
LatestAttestedCommitment: reactive.NewVariable[*Commitment](),
LatestProducedCommitment: reactive.NewVariable[*Commitment](),
ClaimedWeight: reactive.NewVariable[uint64](),
AttestedWeight: reactive.NewVariable[uint64](),
VerifiedWeight: reactive.NewVariable[uint64](),
WarpSyncMode: reactive.NewVariable[bool](),
WarpSyncThreshold: reactive.NewVariable[iotago.SlotIndex](),
OutOfSyncThreshold: reactive.NewVariable[iotago.SlotIndex](),
RequestAttestations: reactive.NewVariable[bool](),
StartEngine: reactive.NewVariable[bool](),
Engine: reactive.NewVariable[*engine.Engine](),
IsEvicted: reactive.NewEvent(),
ForkingPoint: reactive.NewVariable[*Commitment](),
ParentChain: reactive.NewVariable[*Chain](),
ChildChains: reactive.NewSet[*Chain](),
LatestCommitment: reactive.NewVariable[*Commitment](),
LatestAttestedCommitment: reactive.NewVariable[*Commitment](),
LatestFullyBookedCommitment: reactive.NewVariable[*Commitment](),
LatestProducedCommitment: reactive.NewVariable[*Commitment](),
ClaimedWeight: reactive.NewVariable[uint64](),
AttestedWeight: reactive.NewVariable[uint64](),
VerifiedWeight: reactive.NewVariable[uint64](),
WarpSyncModeEnabled: reactive.NewVariable[bool]().Init(true),
OutOfSyncThreshold: reactive.NewVariable[iotago.SlotIndex](),
RequestAttestations: reactive.NewVariable[bool](),
StartEngine: reactive.NewVariable[bool](),
Engine: reactive.NewVariable[*engine.Engine](),
IsEvicted: reactive.NewEvent(),

chains: chains,
commitments: shrinkingmap.New[iotago.SlotIndex, *Commitment](),
Expand Down Expand Up @@ -157,6 +156,8 @@ func (c *Chain) DispatchBlock(block *model.Block, src peer.ID) (dispatched bool)
func (c *Chain) Commitment(slot iotago.SlotIndex) (commitment *Commitment, exists bool) {
for currentChain := c; currentChain != nil; {
switch forkingPoint := currentChain.ForkingPoint.Get(); {
case forkingPoint == nil:
return nil, false
case forkingPoint.Slot() == slot:
return forkingPoint, true
case slot > forkingPoint.Slot():
Expand Down Expand Up @@ -186,14 +187,15 @@ func (c *Chain) initLogger() (shutdown func()) {
c.Logger, shutdown = c.chains.NewEntityLogger("")

return lo.Batch(
c.WarpSyncMode.LogUpdates(c, log.LevelTrace, "WarpSyncMode"),
c.WarpSyncThreshold.LogUpdates(c, log.LevelTrace, "WarpSyncThreshold"),
c.WarpSyncModeEnabled.LogUpdates(c, log.LevelTrace, "WarpSyncModeEnabled"),
c.OutOfSyncThreshold.LogUpdates(c, log.LevelTrace, "OutOfSyncThreshold"),
c.ForkingPoint.LogUpdates(c, log.LevelTrace, "ForkingPoint", (*Commitment).LogName),
c.ClaimedWeight.LogUpdates(c, log.LevelTrace, "ClaimedWeight"),
c.AttestedWeight.LogUpdates(c, log.LevelTrace, "AttestedWeight"),
c.VerifiedWeight.LogUpdates(c, log.LevelTrace, "VerifiedWeight"),
c.LatestCommitment.LogUpdates(c, log.LevelTrace, "LatestCommitment", (*Commitment).LogName),
c.LatestAttestedCommitment.LogUpdates(c, log.LevelTrace, "LatestAttestedCommitment", (*Commitment).LogName),
c.LatestFullyBookedCommitment.LogUpdates(c, log.LevelTrace, "LatestFullyBookedCommitment", (*Commitment).LogName),
c.LatestProducedCommitment.LogUpdates(c, log.LevelDebug, "LatestProducedCommitment", (*Commitment).LogName),
c.RequestAttestations.LogUpdates(c, log.LevelTrace, "RequestAttestations"),
c.StartEngine.LogUpdates(c, log.LevelDebug, "StartEngine"),
Expand All @@ -207,10 +209,21 @@ func (c *Chain) initLogger() (shutdown func()) {
// initDerivedProperties initializes the behavior of this chain by setting up the relations between its properties.
func (c *Chain) initDerivedProperties() (shutdown func()) {
return lo.Batch(
c.deriveClaimedWeight(),
c.deriveVerifiedWeight(),
c.deriveLatestAttestedWeight(),
c.deriveWarpSyncMode(),
c.ClaimedWeight.DeriveValueFrom(reactive.NewDerivedVariable(func(_ uint64, latestCommitment *Commitment) uint64 {
return latestCommitment.cumulativeWeight()
}, c.LatestCommitment)),

c.VerifiedWeight.DeriveValueFrom(reactive.NewDerivedVariable(func(_ uint64, latestProducedCommitment *Commitment) uint64 {
return latestProducedCommitment.cumulativeWeight()
}, c.LatestProducedCommitment)),

c.WarpSyncModeEnabled.DeriveValueFrom(reactive.NewDerivedVariable3(func(enabled bool, latestFullyBookedCommitment *Commitment, latestSeenSlot iotago.SlotIndex, outOfSyncThreshold iotago.SlotIndex) bool {
return warpSyncModeEnabled(enabled, latestFullyBookedCommitment, latestSeenSlot, outOfSyncThreshold)
}, c.LatestFullyBookedCommitment, c.chains.LatestSeenSlot, c.OutOfSyncThreshold, c.WarpSyncModeEnabled.Get())),

c.LatestAttestedCommitment.WithNonEmptyValue(func(latestAttestedCommitment *Commitment) (shutdown func()) {
return c.AttestedWeight.InheritFrom(latestAttestedCommitment.CumulativeAttestedWeight)
}),

c.ForkingPoint.WithValue(func(forkingPoint *Commitment) (shutdown func()) {
return c.deriveParentChain(forkingPoint)
Expand All @@ -221,66 +234,13 @@ func (c *Chain) initDerivedProperties() (shutdown func()) {
}),

c.Engine.WithNonEmptyValue(func(engineInstance *engine.Engine) (shutdown func()) {
return lo.Batch(
c.deriveWarpSyncThreshold(c.chains.LatestSeenSlot, engineInstance),
c.deriveOutOfSyncThreshold(c.chains.LatestSeenSlot, engineInstance),
)
return c.OutOfSyncThreshold.DeriveValueFrom(reactive.NewDerivedVariable(func(_ iotago.SlotIndex, latestSeenSlot iotago.SlotIndex) iotago.SlotIndex {
return outOfSyncThreshold(engineInstance, latestSeenSlot)
}, c.chains.LatestSeenSlot))
}),
)
}

// deriveWarpSyncMode defines how a chain determines whether it is in warp sync mode or not.
func (c *Chain) deriveWarpSyncMode() func() {
return c.WarpSyncMode.DeriveValueFrom(reactive.NewDerivedVariable3(func(warpSyncMode bool, latestProducedCommitment *Commitment, warpSyncThreshold iotago.SlotIndex, outOfSyncThreshold iotago.SlotIndex) bool {
// latest produced commitment is nil if we have not produced any commitment yet (intermediary state during
// startup)
if latestProducedCommitment == nil {
return warpSyncMode
}

// if warp sync mode is enabled, keep it enabled until we are no longer below the warp sync threshold
if warpSyncMode {
return latestProducedCommitment.ID().Slot() < warpSyncThreshold
}

// if warp sync mode is disabled, enable it only if we fall below the out of sync threshold
return latestProducedCommitment.ID().Slot() < outOfSyncThreshold
}, c.LatestProducedCommitment, c.WarpSyncThreshold, c.OutOfSyncThreshold, c.WarpSyncMode.Get()))
}

// deriveClaimedWeight defines how a chain determines its claimed weight (by setting the cumulative weight of the
// latest commitment).
func (c *Chain) deriveClaimedWeight() (shutdown func()) {
return c.ClaimedWeight.DeriveValueFrom(reactive.NewDerivedVariable(func(_ uint64, latestCommitment *Commitment) uint64 {
if latestCommitment == nil {
return 0
}

return latestCommitment.CumulativeWeight()
}, c.LatestCommitment))
}

// deriveLatestAttestedWeight defines how a chain determines its attested weight (by inheriting the cumulative attested
// weight of the latest attested commitment). It uses inheritance instead of simply setting the value as the cumulative
// attested weight can change over time depending on the attestations that are received.
func (c *Chain) deriveLatestAttestedWeight() func() {
return c.LatestAttestedCommitment.WithNonEmptyValue(func(latestAttestedCommitment *Commitment) (shutdown func()) {
return c.AttestedWeight.InheritFrom(latestAttestedCommitment.CumulativeAttestedWeight)
})
}

// deriveVerifiedWeight defines how a chain determines its verified weight (by setting the cumulative weight of the
// latest produced commitment).
func (c *Chain) deriveVerifiedWeight() func() {
return c.VerifiedWeight.DeriveValueFrom(reactive.NewDerivedVariable(func(_ uint64, latestProducedCommitment *Commitment) uint64 {
if latestProducedCommitment == nil {
return 0
}

return latestProducedCommitment.CumulativeWeight()
}, c.LatestProducedCommitment))
}

// deriveChildChains defines how a chain determines its ChildChains (by adding each child to the set).
func (c *Chain) deriveChildChains(child *Chain) func() {
c.ChildChains.Add(child)
Expand Down Expand Up @@ -310,31 +270,6 @@ func (c *Chain) deriveParentChain(forkingPoint *Commitment) (shutdown func()) {
return nil
}

// deriveOutOfSyncThreshold defines how a chain determines its "out of sync" threshold (the latest seen slot minus 2
// times the max committable age or 0 if this would cause an overflow to the negative numbers).
func (c *Chain) deriveOutOfSyncThreshold(latestSeenSlot reactive.ReadableVariable[iotago.SlotIndex], engineInstance *engine.Engine) func() {
return c.OutOfSyncThreshold.DeriveValueFrom(reactive.NewDerivedVariable(func(_ iotago.SlotIndex, latestSeenSlot iotago.SlotIndex) iotago.SlotIndex {
if outOfSyncOffset := 2 * engineInstance.LatestAPI().ProtocolParameters().MaxCommittableAge(); outOfSyncOffset < latestSeenSlot {
return latestSeenSlot - outOfSyncOffset
}

return 0
}, latestSeenSlot))
}

// deriveWarpSyncThreshold defines how a chain determines its warp sync threshold (the latest seen slot minus the max
// committable age or 0 if this would cause an overflow to the negative numbers).
func (c *Chain) deriveWarpSyncThreshold(latestSeenSlot reactive.ReadableVariable[iotago.SlotIndex], engineInstance *engine.Engine) func() {
return c.WarpSyncThreshold.DeriveValueFrom(reactive.NewDerivedVariable(func(_ iotago.SlotIndex, latestSeenSlot iotago.SlotIndex) iotago.SlotIndex {
warpSyncOffset := engineInstance.LatestAPI().ProtocolParameters().MaxCommittableAge()
if warpSyncOffset < latestSeenSlot {
return latestSeenSlot - warpSyncOffset
}

return 0
}, latestSeenSlot))
}

// addCommitment adds the given commitment to this chain.
func (c *Chain) addCommitment(newCommitment *Commitment) (shutdown func()) {
c.commitments.Set(newCommitment.Slot(), newCommitment)
Expand All @@ -343,6 +278,7 @@ func (c *Chain) addCommitment(newCommitment *Commitment) (shutdown func()) {

return lo.Batch(
newCommitment.IsAttested.OnTrigger(func() { c.LatestAttestedCommitment.Set(newCommitment) }),
newCommitment.IsFullyBooked.OnTrigger(func() { c.LatestFullyBookedCommitment.Set(newCommitment) }),
newCommitment.IsCommitted.OnTrigger(func() { c.LatestProducedCommitment.Set(newCommitment) }),
)
}
Expand All @@ -363,7 +299,7 @@ func (c *Chain) dispatchBlockToSpawnedEngine(block *model.Block, src peer.ID) (d
}

// perform additional checks if we are in warp sync mode (only let blocks pass that we requested)
if c.WarpSyncMode.Get() {
if c.WarpSyncModeEnabled.Get() {
// abort if the target commitment does not exist
targetCommitment, targetCommitmentExists := c.Commitment(targetSlot)
if !targetCommitmentExists {
Expand Down Expand Up @@ -400,3 +336,29 @@ func (c *Chain) verifiedWeight() reactive.Variable[uint64] {
func (c *Chain) attestedWeight() reactive.Variable[uint64] {
return c.AttestedWeight
}

// outOfSyncThreshold returns the slot index at which the node is considered out of sync.
func outOfSyncThreshold(engineInstance *engine.Engine, latestSeenSlot iotago.SlotIndex) iotago.SlotIndex {
if outOfSyncOffset := 2 * engineInstance.LatestAPI().ProtocolParameters().MaxCommittableAge(); outOfSyncOffset < latestSeenSlot {
return latestSeenSlot - outOfSyncOffset
}

return 0
}

// warpSyncModeEnabled determines whether warp sync mode should be enabled or not.
func warpSyncModeEnabled(enabled bool, latestFullyBookedCommitment *Commitment, latestSeenSlot iotago.SlotIndex, outOfSyncThreshold iotago.SlotIndex) bool {
// latest produced commitment is nil if we have not produced any commitment yet (intermediary state during
// startup)
if latestFullyBookedCommitment == nil {
return enabled
}

// if warp sync mode is enabled, keep it enabled until we are no longer below the warp sync threshold
if enabled {
return latestFullyBookedCommitment.ID().Slot() < latestSeenSlot
}

// if warp sync mode is disabled, enable it only if we fall below the out of sync threshold
return latestFullyBookedCommitment.ID().Slot() < outOfSyncThreshold
}
25 changes: 17 additions & 8 deletions pkg/protocol/commitment.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,11 @@ func (c *Commitment) initDerivedProperties() (shutdown func()) {
return lo.Batch(
// mark commitments that are marked as root as verified
c.IsCommitted.InheritFrom(c.IsRoot),
c.ReplayDroppedBlocks.InheritFrom(c.IsRoot),

// mark commitments that are marked as verified as attested, fully booked and committable
// mark commitments that are marked as verified as attested and fully booked
c.IsAttested.InheritFrom(c.IsCommitted),
c.IsFullyBooked.InheritFrom(c.IsCommitted),
c.IsCommittable.InheritFrom(c.IsCommitted),

c.Parent.WithNonEmptyValue(func(parent *Commitment) func() {
// the weight can be fixed as a one time operation (as it only relies on static information from the parent
Expand Down Expand Up @@ -274,17 +274,17 @@ func (c *Commitment) deriveRequestAttestations(chain *Chain, parent *Commitment)
// deriveWarpSyncBlocks derives the WarpSyncBlocks flag of this Commitment which is true if our Chain is requesting
// warp sync, and we are the directly above the latest verified Commitment.
func (c *Commitment) deriveWarpSyncBlocks(chain *Chain, parent *Commitment) func() {
return c.WarpSyncBlocks.DeriveValueFrom(reactive.NewDerivedVariable4(func(_ bool, engineInstance *engine.Engine, warpSync bool, parentIsFullyBooked bool, isFullyBooked bool) bool {
return engineInstance != nil && warpSync && parentIsFullyBooked && !isFullyBooked
}, chain.Engine, chain.WarpSyncMode, parent.IsFullyBooked, c.IsFullyBooked))
return c.WarpSyncBlocks.DeriveValueFrom(reactive.NewDerivedVariable4(func(_ bool, engineInstance *engine.Engine, warpSyncModeEnabled bool, parentIsFullyBooked bool, isFullyBooked bool) bool {
return engineInstance != nil && warpSyncModeEnabled && parentIsFullyBooked && !isFullyBooked
}, chain.Engine, chain.WarpSyncModeEnabled, parent.IsFullyBooked, c.IsFullyBooked))
}

// deriveReplayDroppedBlocks derives the ReplayDroppedBlocks flag of this Commitment which is true if our Chain has an
// engine, is no longer requesting warp sync, and we are above the latest verified Commitment.
func (c *Commitment) deriveReplayDroppedBlocks(chain *Chain) func() {
return c.ReplayDroppedBlocks.DeriveValueFrom(reactive.NewDerivedVariable3(func(_ bool, engineInstance *engine.Engine, warpSyncing bool, isAboveLatestVerifiedCommitment bool) bool {
return engineInstance != nil && !warpSyncing && isAboveLatestVerifiedCommitment
}, chain.Engine, chain.WarpSyncMode, c.IsAboveLatestVerifiedCommitment))
return c.ReplayDroppedBlocks.DeriveValueFrom(reactive.NewDerivedVariable3(func(_ bool, engineInstance *engine.Engine, warpSyncModeEnabled bool, isAboveLatestVerifiedCommitment bool) bool {
return engineInstance != nil && !warpSyncModeEnabled && isAboveLatestVerifiedCommitment
}, chain.Engine, chain.WarpSyncModeEnabled, c.IsAboveLatestVerifiedCommitment))
}

// forceChain forces the Chain of this Commitment to the given Chain by promoting it to the main child of its parent if
Expand All @@ -296,3 +296,12 @@ func (c *Commitment) forceChain(targetChain *Chain) {
}
}
}

// cumulativeWeight returns the cumulative weight of this Commitment while gracefully handling nil receivers.
func (c *Commitment) cumulativeWeight() uint64 {
if c == nil {
return 0
}

return c.CumulativeWeight()
}
9 changes: 9 additions & 0 deletions pkg/protocol/commitment_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/iotaledger/hive.go/ds"
"github.com/iotaledger/hive.go/ierrors"
"github.com/iotaledger/hive.go/kvstore/mapdb"
"github.com/iotaledger/hive.go/runtime/syncutils"
"github.com/iotaledger/iota-core/pkg/model"
"github.com/iotaledger/iota-core/pkg/protocol/engine"
"github.com/iotaledger/iota-core/pkg/protocol/engine/accounts"
Expand All @@ -22,6 +23,9 @@ type CommitmentVerifier struct {
// validatorAccountsData is the accounts data of the validators for the current epoch as known at lastCommonSlotBeforeFork.
// Initially, it is set to the accounts data of the validators for the epoch of the last common commitment before the fork.
validatorAccountsData map[iotago.AccountID]*accounts.AccountData

// mutex is used to synchronize access to validatorAccountsData and epoch.
mutex syncutils.RWMutex
}

func newCommitmentVerifier(mainEngine *engine.Engine, lastCommonCommitmentBeforeFork *model.Commitment) (*CommitmentVerifier, error) {
Expand Down Expand Up @@ -76,6 +80,7 @@ func (c *CommitmentVerifier) verifyCommitment(commitment *Commitment, attestatio
// This is necessary because the committee might have rotated at the epoch boundary and different validators might be part of it.
// In case anything goes wrong we keep using previously known accounts data (initially set to the accounts data
// of the validators for the epoch of the last common commitment before the fork).
c.mutex.Lock()
apiForSlot := c.engine.APIForSlot(commitment.Slot())
commitmentEpoch := apiForSlot.TimeProvider().EpochFromSlot(commitment.Slot())
if commitmentEpoch > c.epoch {
Expand All @@ -92,6 +97,7 @@ func (c *CommitmentVerifier) verifyCommitment(commitment *Commitment, attestatio
}
}
}
c.mutex.Unlock()

// 3. Verify attestations.
blockIDs, seatCount, err := c.verifyAttestations(attestations)
Expand All @@ -107,6 +113,9 @@ func (c *CommitmentVerifier) verifyCommitment(commitment *Commitment, attestatio
}

func (c *CommitmentVerifier) verifyAttestations(attestations []*iotago.Attestation) (iotago.BlockIDs, uint64, error) {
c.mutex.RLock()
defer c.mutex.RUnlock()

visitedIdentities := ds.NewSet[iotago.AccountID]()
var blockIDs iotago.BlockIDs
var seatCount uint64
Expand Down
Loading
Loading