Skip to content

Commit

Permalink
Refactor: refactored warpsync to be able to handle loss of acceptance
Browse files Browse the repository at this point in the history
  • Loading branch information
hmoog committed Dec 3, 2023
1 parent 52694be commit 8ad929b
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 54 deletions.
78 changes: 30 additions & 48 deletions pkg/protocol/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ type Chain struct {
// LatestAttestedCommitment contains the latest commitment of this chain for which attestations were received.
LatestAttestedCommitment reactive.Variable[*Commitment]

// LatestFullyBookedCommitment contains the latest commitment of this chain for which all blocks were booked.
LatestFullyBookedCommitment reactive.Variable[*Commitment]

// LatestProducedCommitment contains the latest commitment of this chain that we produced ourselves by booking the
// corresponding blocks in the Engine.
LatestProducedCommitment reactive.Variable[*Commitment]
Expand All @@ -48,10 +51,6 @@ type Chain struct {
// WarpSyncModeEnabled contains a flag that indicates whether this chain is in warp sync mode.
WarpSyncModeEnabled reactive.Variable[bool]

// WarpSyncThreshold contains the slot at which the chain will exit warp sync mode which is derived from the latest
// network slot minus the max committable age.
WarpSyncThreshold reactive.Variable[iotago.SlotIndex]

// OutOfSyncThreshold contains the slot at which the chain will consider itself to be out of sync and switch to warp
// sync mode. It is derived from the latest network slot minus two times the max committable age.
OutOfSyncThreshold reactive.Variable[iotago.SlotIndex]
Expand Down Expand Up @@ -83,22 +82,22 @@ type Chain struct {
// newChain creates a new chain instance.
func newChain(chains *Chains) *Chain {
c := &Chain{
ForkingPoint: reactive.NewVariable[*Commitment](),
ParentChain: reactive.NewVariable[*Chain](),
ChildChains: reactive.NewSet[*Chain](),
LatestCommitment: reactive.NewVariable[*Commitment](),
LatestAttestedCommitment: reactive.NewVariable[*Commitment](),
LatestProducedCommitment: reactive.NewVariable[*Commitment](),
ClaimedWeight: reactive.NewVariable[uint64](),
AttestedWeight: reactive.NewVariable[uint64](),
VerifiedWeight: reactive.NewVariable[uint64](),
WarpSyncModeEnabled: reactive.NewVariable[bool]().Init(true),
WarpSyncThreshold: reactive.NewVariable[iotago.SlotIndex](),
OutOfSyncThreshold: reactive.NewVariable[iotago.SlotIndex](),
RequestAttestations: reactive.NewVariable[bool](),
StartEngine: reactive.NewVariable[bool](),
Engine: reactive.NewVariable[*engine.Engine](),
IsEvicted: reactive.NewEvent(),
ForkingPoint: reactive.NewVariable[*Commitment](),
ParentChain: reactive.NewVariable[*Chain](),
ChildChains: reactive.NewSet[*Chain](),
LatestCommitment: reactive.NewVariable[*Commitment](),
LatestAttestedCommitment: reactive.NewVariable[*Commitment](),
LatestFullyBookedCommitment: reactive.NewVariable[*Commitment](),
LatestProducedCommitment: reactive.NewVariable[*Commitment](),
ClaimedWeight: reactive.NewVariable[uint64](),
AttestedWeight: reactive.NewVariable[uint64](),
VerifiedWeight: reactive.NewVariable[uint64](),
WarpSyncModeEnabled: reactive.NewVariable[bool]().Init(true),
OutOfSyncThreshold: reactive.NewVariable[iotago.SlotIndex](),
RequestAttestations: reactive.NewVariable[bool](),
StartEngine: reactive.NewVariable[bool](),
Engine: reactive.NewVariable[*engine.Engine](),
IsEvicted: reactive.NewEvent(),

chains: chains,
commitments: shrinkingmap.New[iotago.SlotIndex, *Commitment](),
Expand Down Expand Up @@ -189,13 +188,14 @@ func (c *Chain) initLogger() (shutdown func()) {

return lo.Batch(
c.WarpSyncModeEnabled.LogUpdates(c, log.LevelTrace, "WarpSyncModeEnabled"),
c.WarpSyncThreshold.LogUpdates(c, log.LevelTrace, "WarpSyncThreshold"),
c.OutOfSyncThreshold.LogUpdates(c, log.LevelTrace, "OutOfSyncThreshold"),
c.ForkingPoint.LogUpdates(c, log.LevelTrace, "ForkingPoint", (*Commitment).LogName),
c.ClaimedWeight.LogUpdates(c, log.LevelTrace, "ClaimedWeight"),
c.AttestedWeight.LogUpdates(c, log.LevelTrace, "AttestedWeight"),
c.VerifiedWeight.LogUpdates(c, log.LevelTrace, "VerifiedWeight"),
c.LatestCommitment.LogUpdates(c, log.LevelTrace, "LatestCommitment", (*Commitment).LogName),
c.LatestAttestedCommitment.LogUpdates(c, log.LevelTrace, "LatestAttestedCommitment", (*Commitment).LogName),
c.LatestFullyBookedCommitment.LogUpdates(c, log.LevelTrace, "LatestFullyBookedCommitment", (*Commitment).LogName),
c.LatestProducedCommitment.LogUpdates(c, log.LevelDebug, "LatestProducedCommitment", (*Commitment).LogName),
c.RequestAttestations.LogUpdates(c, log.LevelTrace, "RequestAttestations"),
c.StartEngine.LogUpdates(c, log.LevelDebug, "StartEngine"),
Expand All @@ -217,9 +217,9 @@ func (c *Chain) initDerivedProperties() (shutdown func()) {
return latestProducedCommitment.cumulativeWeight()
}, c.LatestProducedCommitment)),

c.WarpSyncModeEnabled.DeriveValueFrom(reactive.NewDerivedVariable3(func(warpSyncMode bool, latestProducedCommitment *Commitment, warpSyncThreshold iotago.SlotIndex, outOfSyncThreshold iotago.SlotIndex) bool {
return warpSyncModeEnabled(warpSyncMode, latestProducedCommitment, warpSyncThreshold, outOfSyncThreshold)
}, c.LatestProducedCommitment, c.WarpSyncThreshold, c.OutOfSyncThreshold, c.WarpSyncModeEnabled.Get())),
c.WarpSyncModeEnabled.DeriveValueFrom(reactive.NewDerivedVariable3(func(warpSyncMode bool, latestFullyBookedCommitment *Commitment, latestSeenSlot iotago.SlotIndex, outOfSyncThreshold iotago.SlotIndex) bool {
return warpSyncModeEnabled(warpSyncMode, latestFullyBookedCommitment, latestSeenSlot, outOfSyncThreshold)
}, c.LatestFullyBookedCommitment, c.chains.LatestSeenSlot, c.OutOfSyncThreshold, c.WarpSyncModeEnabled.Get())),

c.LatestAttestedCommitment.WithNonEmptyValue(func(latestAttestedCommitment *Commitment) (shutdown func()) {
return c.AttestedWeight.InheritFrom(latestAttestedCommitment.CumulativeAttestedWeight)
Expand All @@ -234,15 +234,9 @@ func (c *Chain) initDerivedProperties() (shutdown func()) {
}),

c.Engine.WithNonEmptyValue(func(engineInstance *engine.Engine) (shutdown func()) {
return lo.Batch(
c.WarpSyncThreshold.DeriveValueFrom(reactive.NewDerivedVariable(func(_ iotago.SlotIndex, latestSeenSlot iotago.SlotIndex) iotago.SlotIndex {
return warpSyncThreshold(engineInstance, latestSeenSlot)
}, c.chains.LatestSeenSlot)),

c.OutOfSyncThreshold.DeriveValueFrom(reactive.NewDerivedVariable(func(_ iotago.SlotIndex, latestSeenSlot iotago.SlotIndex) iotago.SlotIndex {
return outOfSyncThreshold(engineInstance, latestSeenSlot)
}, c.chains.LatestSeenSlot)),
)
return c.OutOfSyncThreshold.DeriveValueFrom(reactive.NewDerivedVariable(func(_ iotago.SlotIndex, latestSeenSlot iotago.SlotIndex) iotago.SlotIndex {
return outOfSyncThreshold(engineInstance, latestSeenSlot)
}, c.chains.LatestSeenSlot))
}),
)
}
Expand Down Expand Up @@ -284,6 +278,7 @@ func (c *Chain) addCommitment(newCommitment *Commitment) (shutdown func()) {

return lo.Batch(
newCommitment.IsAttested.OnTrigger(func() { c.LatestAttestedCommitment.Set(newCommitment) }),
newCommitment.IsFullyBooked.OnTrigger(func() { c.LatestFullyBookedCommitment.Set(newCommitment) }),
newCommitment.IsCommitted.OnTrigger(func() { c.LatestProducedCommitment.Set(newCommitment) }),
)
}
Expand Down Expand Up @@ -342,19 +337,6 @@ func (c *Chain) attestedWeight() reactive.Variable[uint64] {
return c.AttestedWeight
}

// warpSyncThreshold returns the slot index at which the warp sync should stop.
func warpSyncThreshold(engineInstance *engine.Engine, latestSlot iotago.SlotIndex) iotago.SlotIndex {
// TODO: explain why we do - 1 here
warpSyncOffset := engineInstance.LatestAPI().ProtocolParameters().MinCommittableAge() - 1

// prevent overflow to negative numbers
if warpSyncOffset >= latestSlot {
return 0
}

return latestSlot - warpSyncOffset
}

// outOfSyncThreshold returns the slot index at which the node is considered out of sync.
func outOfSyncThreshold(engineInstance *engine.Engine, latestSeenSlot iotago.SlotIndex) iotago.SlotIndex {
if outOfSyncOffset := 2 * engineInstance.LatestAPI().ProtocolParameters().MaxCommittableAge(); outOfSyncOffset < latestSeenSlot {
Expand All @@ -365,7 +347,7 @@ func outOfSyncThreshold(engineInstance *engine.Engine, latestSeenSlot iotago.Slo
}

// warpSyncModeEnabled determines whether warp sync mode should be enabled or not.
func warpSyncModeEnabled(enabled bool, latestProducedCommitment *Commitment, warpSyncThreshold iotago.SlotIndex, outOfSyncThreshold iotago.SlotIndex) bool {
func warpSyncModeEnabled(enabled bool, latestProducedCommitment *Commitment, latestSeenSlot iotago.SlotIndex, outOfSyncThreshold iotago.SlotIndex) bool {
// latest produced commitment is nil if we have not produced any commitment yet (intermediary state during
// startup)
if latestProducedCommitment == nil {
Expand All @@ -374,7 +356,7 @@ func warpSyncModeEnabled(enabled bool, latestProducedCommitment *Commitment, war

// if warp sync mode is enabled, keep it enabled until we are no longer below the warp sync threshold
if enabled {
return latestProducedCommitment.ID().Slot() < warpSyncThreshold
return latestProducedCommitment.ID().Slot() < latestSeenSlot
}

// if warp sync mode is disabled, enable it only if we fall below the out of sync threshold
Expand Down
3 changes: 1 addition & 2 deletions pkg/protocol/commitment.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,9 @@ func (c *Commitment) initDerivedProperties() (shutdown func()) {
c.IsCommitted.InheritFrom(c.IsRoot),
c.ReplayDroppedBlocks.InheritFrom(c.IsRoot),

// mark commitments that are marked as verified as attested, fully booked and committable
// mark commitments that are marked as verified as attested and fully booked
c.IsAttested.InheritFrom(c.IsCommitted),
c.IsFullyBooked.InheritFrom(c.IsCommitted),
c.IsCommittable.InheritFrom(c.IsCommitted),

c.Parent.WithNonEmptyValue(func(parent *Commitment) func() {
// the weight can be fixed as a one time operation (as it only relies on static information from the parent
Expand Down
16 changes: 12 additions & 4 deletions pkg/protocol/protocol_warp_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,17 +247,25 @@ func (w *WarpSyncProtocol) ProcessResponse(commitmentID iotago.CommitmentID, blo
})
}

// Once all blocks are fully booked we can mark the commitment that is minCommittableAge older as this
// commitment to be committable.
commitment.IsFullyBooked.OnUpdateOnce(func(_ bool, _ bool) {
if committableCommitment, exists := chain.Commitment(warpSyncThreshold(targetEngine, commitmentID.Slot())); exists {
committableCommitment.IsCommittable.Set(true)
if committableCommitment, exists := chain.Commitment(commitmentID.Slot() - targetEngine.LatestAPI().ProtocolParameters().MinCommittableAge()); exists {
w.workerPool.Submit(func() {
committableCommitment.IsCommittable.Set(true)
})
}
})

commitment.IsCommittable.OnUpdateOnce(func(_ bool, _ bool) {
w.workerPool.Submit(forceCommitmentFunc)
// force commit one by one and wait for the parent to be committed before we can commit the next one
commitment.Parent.WithNonEmptyValue(func(parent *Commitment) (teardown func()) {
return parent.IsCommitted.WithNonEmptyValue(func(_ bool) (teardown func()) {
return commitment.IsCommittable.OnTrigger(forceCommitmentFunc)
})
})

if totalBlocks == 0 {
commitment.IsCommittable.Set(true)
commitment.IsFullyBooked.Set(true)

return blocksToWarpSync
Expand Down

0 comments on commit 8ad929b

Please sign in to comment.