Skip to content

Commit

Permalink
Merge pull request #630 from iotaledger/feat/metastability-breaker
Browse files Browse the repository at this point in the history
Feat: Add metastability breaking mechanism
  • Loading branch information
jonastheis authored Jan 26, 2024
2 parents 95c0bc1 + 69963d1 commit c62c95e
Show file tree
Hide file tree
Showing 14 changed files with 634 additions and 147 deletions.
20 changes: 14 additions & 6 deletions pkg/protocol/attestations.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,11 @@ func (a *Attestations) initRequester() (shutdown func()) {
unsubscribeFromTicker := lo.Batch(
a.protocol.Commitments.WithElements(func(commitment *Commitment) (shutdown func()) {
return commitment.RequestAttestations.WithNonEmptyValue(func(_ bool) (teardown func()) {
if commitment.CumulativeWeight() == 0 {
commitment.IsAttested.Set(true)
if commitment.CumulativeWeight.Get() == 0 {
// execute in worker pool since it can have long-running effects (e.g. chain switching)
a.workerPool.Submit(func() {
commitment.IsAttested.Set(true)
})

return nil
}
Expand Down Expand Up @@ -161,7 +164,10 @@ func (a *Attestations) processResponse(commitment *model.Commitment, attestation
return
}

if publishedCommitment.AttestedWeight.Compute(func(currentWeight uint64) uint64 {
var wasAttested, attestationsUpdated bool
publishedCommitment.AttestedWeight.Compute(func(currentWeight uint64) uint64 {
wasAttested = currentWeight > 0

if !publishedCommitment.RequestAttestations.Get() {
a.LogTrace("received attestations for previously attested commitment", "commitment", publishedCommitment.LogName())

Expand Down Expand Up @@ -189,12 +195,14 @@ func (a *Attestations) processResponse(commitment *model.Commitment, attestation
return currentWeight
}

if actualWeight > currentWeight {
a.LogDebug("received response", "commitment", publishedCommitment.LogName(), "fromPeer", from)
if attestationsUpdated = actualWeight > currentWeight; attestationsUpdated {
a.LogDebug("received response", "commitment", publishedCommitment.LogName(), "weight", actualWeight, "fromPeer", from)
}

return actualWeight
}) > 0 {
})

if !wasAttested && attestationsUpdated {
publishedCommitment.IsAttested.Set(true)
}
})
Expand Down
72 changes: 0 additions & 72 deletions pkg/protocol/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,6 @@ type Chain struct {
// corresponding blocks in the Engine.
LatestProducedCommitment reactive.Variable[*Commitment]

// ClaimedWeight contains the claimed weight of this chain which is derived from the cumulative weight of the
// LatestCommitment.
ClaimedWeight reactive.Variable[uint64]

// AttestedWeight contains the attested weight of this chain which is derived from the cumulative weight of all
// attestations up to the LatestAttestedCommitment.
AttestedWeight reactive.Variable[uint64]

// VerifiedWeight contains the verified weight of this chain which is derived from the cumulative weight of the
// latest verified commitment.
VerifiedWeight reactive.Variable[uint64]

// WarpSyncMode contains a flag that indicates whether this chain is in warp sync mode.
WarpSyncMode reactive.Variable[bool]

Expand Down Expand Up @@ -88,9 +76,6 @@ func newChain(chains *Chains) *Chain {
LatestCommitment: reactive.NewVariable[*Commitment](),
LatestAttestedCommitment: reactive.NewVariable[*Commitment](),
LatestProducedCommitment: reactive.NewVariable[*Commitment](),
ClaimedWeight: reactive.NewVariable[uint64](),
AttestedWeight: reactive.NewVariable[uint64](),
VerifiedWeight: reactive.NewVariable[uint64](),
WarpSyncMode: reactive.NewVariable[bool]().Init(true),
LatestSyncedSlot: reactive.NewVariable[iotago.SlotIndex](),
OutOfSyncThreshold: reactive.NewVariable[iotago.SlotIndex](),
Expand Down Expand Up @@ -189,9 +174,6 @@ func (c *Chain) initLogger() (shutdown func()) {
c.LatestSyncedSlot.LogUpdates(c, log.LevelTrace, "LatestSyncedSlot"),
c.OutOfSyncThreshold.LogUpdates(c, log.LevelTrace, "OutOfSyncThreshold"),
c.ForkingPoint.LogUpdates(c, log.LevelTrace, "ForkingPoint", (*Commitment).LogName),
c.ClaimedWeight.LogUpdates(c, log.LevelTrace, "ClaimedWeight"),
c.AttestedWeight.LogUpdates(c, log.LevelTrace, "AttestedWeight"),
c.VerifiedWeight.LogUpdates(c, log.LevelTrace, "VerifiedWeight"),
c.LatestCommitment.LogUpdates(c, log.LevelTrace, "LatestCommitment", (*Commitment).LogName),
c.LatestAttestedCommitment.LogUpdates(c, log.LevelTrace, "LatestAttestedCommitment", (*Commitment).LogName),
c.LatestProducedCommitment.LogUpdates(c, log.LevelDebug, "LatestProducedCommitment", (*Commitment).LogName),
Expand All @@ -207,9 +189,6 @@ func (c *Chain) initLogger() (shutdown func()) {
// initDerivedProperties initializes the behavior of this chain by setting up the relations between its properties.
func (c *Chain) initDerivedProperties() (shutdown func()) {
return lo.Batch(
c.deriveClaimedWeight(),
c.deriveVerifiedWeight(),
c.deriveLatestAttestedWeight(),
c.deriveWarpSyncMode(),

c.ForkingPoint.WithValue(c.deriveParentChain),
Expand All @@ -231,39 +210,6 @@ func (c *Chain) deriveWarpSyncMode() func() {
}, c.LatestSyncedSlot, c.chains.LatestSeenSlot, c.OutOfSyncThreshold, c.WarpSyncMode.Get()))
}

// deriveClaimedWeight defines how a chain determines its claimed weight (by setting the cumulative weight of the
// latest commitment).
func (c *Chain) deriveClaimedWeight() (shutdown func()) {
return c.ClaimedWeight.DeriveValueFrom(reactive.NewDerivedVariable(func(_ uint64, latestCommitment *Commitment) uint64 {
if latestCommitment == nil {
return 0
}

return latestCommitment.CumulativeWeight()
}, c.LatestCommitment))
}

// deriveLatestAttestedWeight defines how a chain determines its attested weight (by inheriting the cumulative attested
// weight of the latest attested commitment). It uses inheritance instead of simply setting the value as the cumulative
// attested weight can change over time depending on the attestations that are received.
func (c *Chain) deriveLatestAttestedWeight() func() {
return c.LatestAttestedCommitment.WithNonEmptyValue(func(latestAttestedCommitment *Commitment) (shutdown func()) {
return c.AttestedWeight.InheritFrom(latestAttestedCommitment.CumulativeAttestedWeight)
})
}

// deriveVerifiedWeight defines how a chain determines its verified weight (by setting the cumulative weight of the
// latest produced commitment).
func (c *Chain) deriveVerifiedWeight() func() {
return c.VerifiedWeight.DeriveValueFrom(reactive.NewDerivedVariable(func(_ uint64, latestProducedCommitment *Commitment) uint64 {
if latestProducedCommitment == nil {
return 0
}

return latestProducedCommitment.CumulativeWeight()
}, c.LatestProducedCommitment))
}

// deriveChildChains defines how a chain determines its ChildChains (by adding each child to the set).
func (c *Chain) deriveChildChains(child *Chain) func() {
c.ChildChains.Add(child)
Expand Down Expand Up @@ -353,21 +299,3 @@ func (c *Chain) dispatchBlockToSpawnedEngine(block *model.Block, src peer.ID) (d

return true
}

// claimedWeight is a getter for the ClaimedWeight variable of this chain, which is internally used to be able to
// "address" the variable across multiple chains in a generic way.
func (c *Chain) claimedWeight() reactive.Variable[uint64] {
return c.ClaimedWeight
}

// verifiedWeight is a getter for the VerifiedWeight variable of this chain, which is internally used to be able to
// "address" the variable across multiple chains in a generic way.
func (c *Chain) verifiedWeight() reactive.Variable[uint64] {
return c.VerifiedWeight
}

// attestedWeight is a getter for the AttestedWeight variable of this chain, which is internally used to be able to
// "address" the variable across multiple chains in a generic way.
func (c *Chain) attestedWeight() reactive.Variable[uint64] {
return c.AttestedWeight
}
Loading

0 comments on commit c62c95e

Please sign in to comment.