Skip to content
This repository has been archived by the owner on Jan 24, 2025. It is now read-only.

Fix: WarpSync #568

Merged
merged 65 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
b6878f4
Fix: Fix WarpSync
hmoog Nov 27, 2023
0539f34
Feat: added more logic
hmoog Nov 27, 2023
e82a4be
Fix: fixed more code
hmoog Nov 27, 2023
ab3fdec
Trigger Commitment's IsCommittable at - MCA + 1
karimodm Nov 27, 2023
66807e0
Deadlock fix
karimodm Nov 27, 2023
18ab2ef
Merge branch 'feat/reactive-chainmanager' into fix/warpSync
karimodm Nov 28, 2023
000355b
Force accept and commit with MCA delay
karimodm Nov 28, 2023
58a9972
Add rootblocks only before force-accepting them
karimodm Nov 28, 2023
d1cfa46
Merge remote-tracking branch 'origin/feat/reactive-chainmanager' into…
karimodm Nov 28, 2023
9e9697c
Reset the Engine when WarpSyncMode turns true
karimodm Nov 28, 2023
4338642
Merge branch 'feat/reactive-chainmanager' of github.com:iotaledger/io…
hmoog Nov 28, 2023
51431c7
Fix: TestLossOfAcceptanceFromGenesis test fixed
hmoog Nov 28, 2023
ef9927b
Merge branch 'feat/reactive-chainmanager' of github.com:iotaledger/io…
hmoog Nov 29, 2023
b4d6346
Feat: increased timeout
hmoog Nov 29, 2023
e1bfcdf
Feat: extend waiting time?
hmoog Nov 29, 2023
1e4b2f2
Feat: Added PR for bundled changes while debugging
hmoog Nov 29, 2023
a467a26
Feat: added more logs
hmoog Nov 29, 2023
ed38887
Feat: added log output
hmoog Nov 29, 2023
0db2bf0
Feat: changed some code
hmoog Nov 30, 2023
13cd73a
Feat: added log
hmoog Nov 30, 2023
8f60e3d
Feat: add logging
hmoog Nov 30, 2023
a524987
Feat: added logging
hmoog Nov 30, 2023
ff87a2e
Feat: added solidity stuff to the logging - let's see
hmoog Nov 30, 2023
8b97187
Feat: add logging
hmoog Nov 30, 2023
7b8abd6
Merge branch 'feat/reactive-chainmanager' of github.com:iotaledger/io…
hmoog Nov 30, 2023
aa6642e
Change warp sync to wait for WeightPropagated instead of Booked of bl…
jonastheis Dec 1, 2023
bb338af
Adjust TestProtocol_EngineSwitching_CommitteeRotation to wait for the…
jonastheis Dec 1, 2023
aeb9258
Fix calculation of WarpSyncThreshold
jonastheis Dec 1, 2023
62a9cb4
WarpSyncMode should be enabled if latestProducedCommitment == nil
jonastheis Dec 1, 2023
7243706
Feat: fixed some stuff
hmoog Dec 1, 2023
66e843a
Merge branch 'fix/warpSync' of github.com:iotaledger/iota-core into d…
hmoog Dec 1, 2023
32ebf8b
Fix: fixed tests
hmoog Dec 1, 2023
b3049b5
Refactor: reverted changes
hmoog Dec 1, 2023
9c845ec
Refactor: reverted more changes
hmoog Dec 1, 2023
1e8e9dd
Refactor: reverted more changes
hmoog Dec 1, 2023
52694be
Fix: fixed bugs and refactored code
hmoog Dec 1, 2023
8ad929b
Refactor: refactored warpsync to be able to handle loss of acceptance
hmoog Dec 3, 2023
89f0213
Refactor: fixed typo + race condition
hmoog Dec 3, 2023
af324c3
Fix bugs in WarpSync logic (#577)
hmoog Dec 3, 2023
6eae1b4
Refactor: started reverting unnecessary changes
hmoog Dec 3, 2023
14f7108
Merge branch 'fix/warpSync' of github.com:iotaledger/iota-core into d…
hmoog Dec 3, 2023
7675908
Refactor: reverted changes
hmoog Dec 3, 2023
8a2a19e
Refactor: reverted rename
hmoog Dec 3, 2023
aa420c9
Refactor: addressed linter issues
hmoog Dec 3, 2023
dd8771a
Refactor: reduced changes
hmoog Dec 3, 2023
044b797
Refactor: reverted unnecessary changes
hmoog Dec 3, 2023
010e103
Refactor: reverted more changes
hmoog Dec 3, 2023
a3a5c72
Refactor: minimizing more changes
hmoog Dec 3, 2023
0ce46e7
Refactor: reverted more code
hmoog Dec 3, 2023
b6b430f
Refactor: revert
hmoog Dec 3, 2023
5beda14
Refactor: revert
hmoog Dec 3, 2023
2b819f9
Refactor: reverted more
hmoog Dec 3, 2023
75240f0
Refactor: revert
hmoog Dec 3, 2023
e83a78e
Fix: fix possible nil pointer exception
hmoog Dec 3, 2023
90c1fd3
Refactor: renamed flags
hmoog Dec 4, 2023
8b5b5dc
Refactor: finished rename
hmoog Dec 4, 2023
5241622
Refactor: refactored warpsync protocol
hmoog Dec 4, 2023
c6a7242
Feat: updated comments
hmoog Dec 4, 2023
5fb3535
Refactor: updated comments
hmoog Dec 4, 2023
bfe40ba
Merge remote-tracking branch 'origin/feat/reactive-chainmanager' into…
jonastheis Dec 4, 2023
6044a6d
Merge branch 'feat/reactive-chainmanager' into fix/warpSync
karimodm Dec 4, 2023
9576fd6
Do not mark blocks as root blocks when warp syncing as that is done a…
jonastheis Dec 4, 2023
d8d6430
Address review comments
jonastheis Dec 4, 2023
1419993
Merge branch 'fix/warpSync' of github.com:iotaledger/iota-core into f…
jonastheis Dec 4, 2023
ac6a993
Fix deadlock
jonastheis Dec 4, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 16 additions & 43 deletions pkg/protocol/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,8 @@ type Chain struct {
// WarpSyncMode contains a flag that indicates whether this chain is in warp sync mode.
WarpSyncMode reactive.Variable[bool]

// WarpSyncThreshold contains the slot at which the chain will exit warp sync mode which is derived from the latest
// network slot minus the max committable age.
WarpSyncThreshold reactive.Variable[iotago.SlotIndex]
// LatestSyncedSlot contains the latest commitment of this chain for which all blocks were booked.
LatestSyncedSlot reactive.Variable[iotago.SlotIndex]

// OutOfSyncThreshold contains the slot at which the chain will consider itself to be out of sync and switch to warp
// sync mode. It is derived from the latest network slot minus two times the max committable age.
Expand Down Expand Up @@ -93,7 +92,7 @@ func newChain(chains *Chains) *Chain {
AttestedWeight: reactive.NewVariable[uint64](),
VerifiedWeight: reactive.NewVariable[uint64](),
WarpSyncMode: reactive.NewVariable[bool]().Init(true),
WarpSyncThreshold: reactive.NewVariable[iotago.SlotIndex](),
LatestSyncedSlot: reactive.NewVariable[iotago.SlotIndex](),
OutOfSyncThreshold: reactive.NewVariable[iotago.SlotIndex](),
RequestAttestations: reactive.NewVariable[bool](),
StartEngine: reactive.NewVariable[bool](),
Expand Down Expand Up @@ -187,13 +186,14 @@ func (c *Chain) initLogger() (shutdown func()) {

return lo.Batch(
c.WarpSyncMode.LogUpdates(c, log.LevelTrace, "WarpSyncMode"),
c.WarpSyncThreshold.LogUpdates(c, log.LevelTrace, "WarpSyncThreshold"),
c.LatestSyncedSlot.LogUpdates(c, log.LevelTrace, "LatestSyncedSlot"),
c.OutOfSyncThreshold.LogUpdates(c, log.LevelTrace, "OutOfSyncThreshold"),
c.ForkingPoint.LogUpdates(c, log.LevelTrace, "ForkingPoint", (*Commitment).LogName),
c.ClaimedWeight.LogUpdates(c, log.LevelTrace, "ClaimedWeight"),
c.AttestedWeight.LogUpdates(c, log.LevelTrace, "AttestedWeight"),
c.VerifiedWeight.LogUpdates(c, log.LevelTrace, "VerifiedWeight"),
c.LatestCommitment.LogUpdates(c, log.LevelTrace, "LatestCommitment", (*Commitment).LogName),
c.LatestAttestedCommitment.LogUpdates(c, log.LevelTrace, "LatestAttestedCommitment", (*Commitment).LogName),
c.LatestProducedCommitment.LogUpdates(c, log.LevelDebug, "LatestProducedCommitment", (*Commitment).LogName),
c.RequestAttestations.LogUpdates(c, log.LevelTrace, "RequestAttestations"),
c.StartEngine.LogUpdates(c, log.LevelDebug, "StartEngine"),
Expand All @@ -212,39 +212,23 @@ func (c *Chain) initDerivedProperties() (shutdown func()) {
c.deriveLatestAttestedWeight(),
c.deriveWarpSyncMode(),

c.ForkingPoint.WithValue(func(forkingPoint *Commitment) (shutdown func()) {
return c.deriveParentChain(forkingPoint)
}),

c.ParentChain.WithNonEmptyValue(func(parentChain *Chain) (shutdown func()) {
return parentChain.deriveChildChains(c)
}),

c.Engine.WithNonEmptyValue(func(engineInstance *engine.Engine) (shutdown func()) {
return lo.Batch(
c.deriveWarpSyncThreshold(c.chains.LatestSeenSlot, engineInstance),
c.deriveOutOfSyncThreshold(c.chains.LatestSeenSlot, engineInstance),
)
}),
c.ForkingPoint.WithValue(c.deriveParentChain),
c.ParentChain.WithNonEmptyValue(lo.Bind(c, (*Chain).deriveChildChains)),
c.Engine.WithNonEmptyValue(c.deriveOutOfSyncThreshold),
)
}

// deriveWarpSyncMode defines how a chain determines whether it is in warp sync mode or not.
func (c *Chain) deriveWarpSyncMode() func() {
return c.WarpSyncMode.DeriveValueFrom(reactive.NewDerivedVariable3(func(warpSyncMode bool, latestProducedCommitment *Commitment, warpSyncThreshold iotago.SlotIndex, outOfSyncThreshold iotago.SlotIndex) bool {
// if we have no latest produced commitment, then the engine is not yet initialized and warp sync is disabled
if latestProducedCommitment == nil {
return false
}

// if warp sync mode is enabled, keep it enabled until we are no longer below the warp sync threshold
return c.WarpSyncMode.DeriveValueFrom(reactive.NewDerivedVariable3(func(warpSyncMode bool, latestSyncedSlot iotago.SlotIndex, latestSeenSlot iotago.SlotIndex, outOfSyncThreshold iotago.SlotIndex) bool {
// if warp sync mode is enabled, keep it enabled until we have synced all slots
if warpSyncMode {
return latestProducedCommitment.ID().Slot() < warpSyncThreshold
return latestSyncedSlot < latestSeenSlot
}

// if warp sync mode is disabled, enable it only if we fall below the out of sync threshold
return latestProducedCommitment.ID().Slot() < outOfSyncThreshold
}, c.LatestProducedCommitment, c.WarpSyncThreshold, c.OutOfSyncThreshold, c.WarpSyncMode.Get()))
return latestSyncedSlot < outOfSyncThreshold
}, c.LatestSyncedSlot, c.chains.LatestSeenSlot, c.OutOfSyncThreshold, c.WarpSyncMode.Get()))
}

// deriveClaimedWeight defines how a chain determines its claimed weight (by setting the cumulative weight of the
Expand Down Expand Up @@ -311,26 +295,14 @@ func (c *Chain) deriveParentChain(forkingPoint *Commitment) (shutdown func()) {

// deriveOutOfSyncThreshold defines how a chain determines its "out of sync" threshold (the latest seen slot minus 2
// times the max committable age or 0 if this would cause an overflow to the negative numbers).
func (c *Chain) deriveOutOfSyncThreshold(latestSeenSlot reactive.ReadableVariable[iotago.SlotIndex], engineInstance *engine.Engine) func() {
func (c *Chain) deriveOutOfSyncThreshold(engineInstance *engine.Engine) func() {
return c.OutOfSyncThreshold.DeriveValueFrom(reactive.NewDerivedVariable(func(_ iotago.SlotIndex, latestSeenSlot iotago.SlotIndex) iotago.SlotIndex {
if outOfSyncOffset := 2 * engineInstance.LatestAPI().ProtocolParameters().MaxCommittableAge(); outOfSyncOffset < latestSeenSlot {
return latestSeenSlot - outOfSyncOffset
}

return 0
}, latestSeenSlot))
}

// deriveWarpSyncThreshold defines how a chain determines its warp sync threshold (the latest seen slot minus the max
// committable age or 0 if this would cause an overflow to the negative numbers).
func (c *Chain) deriveWarpSyncThreshold(latestSeenSlot reactive.ReadableVariable[iotago.SlotIndex], engineInstance *engine.Engine) func() {
return c.WarpSyncThreshold.DeriveValueFrom(reactive.NewDerivedVariable(func(_ iotago.SlotIndex, latestSeenSlot iotago.SlotIndex) iotago.SlotIndex {
if warpSyncOffset := engineInstance.LatestAPI().ProtocolParameters().MaxCommittableAge(); warpSyncOffset < latestSeenSlot {
return latestSeenSlot - warpSyncOffset
}

return 0
}, latestSeenSlot))
}, c.chains.LatestSeenSlot))
}

// addCommitment adds the given commitment to this chain.
Expand All @@ -342,6 +314,7 @@ func (c *Chain) addCommitment(newCommitment *Commitment) (shutdown func()) {
return lo.Batch(
newCommitment.IsAttested.OnTrigger(func() { c.LatestAttestedCommitment.Set(newCommitment) }),
newCommitment.IsVerified.OnTrigger(func() { c.LatestProducedCommitment.Set(newCommitment) }),
newCommitment.IsSynced.OnTrigger(func() { c.LatestSyncedSlot.Set(newCommitment.Slot()) }),
)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/protocol/chains.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type Chains struct {
// HeaviestVerifiedCandidate contains the candidate chain with the heaviest verified weight.
HeaviestVerifiedCandidate reactive.Variable[*Chain]

// LatestSeenSlot contains the latest slot that was seen by any of the chains.
// LatestSeenSlot contains the slot of the latest commitment of any received block.
LatestSeenSlot reactive.Variable[iotago.SlotIndex]

// protocol contains a reference to the Protocol instance that this component belongs to.
Expand Down Expand Up @@ -158,7 +158,7 @@ func (c *Chains) deriveLatestSeenSlot(protocol *Protocol) func() {
}),

protocol.Network.OnBlockReceived(func(block *model.Block, src peer.ID) {
c.LatestSeenSlot.Set(mainEngine.LatestAPI().TimeProvider().SlotFromTime(block.ProtocolBlock().Header.IssuingTime))
c.LatestSeenSlot.Set(block.ProtocolBlock().Header.SlotCommitmentID.Slot())
}),
)
})
Expand Down
31 changes: 23 additions & 8 deletions pkg/protocol/commitment.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ type Commitment struct {
// IsAttested contains a flag indicating if we have received attestations for this Commitment.
IsAttested reactive.Event

// IsSynced contains a flag that indicates if a Commitment was fully downloaded and processed.
IsSynced reactive.Event

// IsCommittable contains a flag that indicates if a Commitment is ready to be committed by the warp sync process.
IsCommittable reactive.Event

// IsVerified contains a flag indicating if this Commitment is verified (we produced this Commitment ourselves by
// booking all the contained blocks and transactions).
IsVerified reactive.Event
Expand All @@ -65,7 +71,7 @@ type Commitment struct {
IsAboveLatestVerifiedCommitment reactive.Variable[bool]

// ReplayDroppedBlocks contains a flag indicating if we should replay the blocks that were dropped while the
//Commitment was pending.
// Commitment was pending.
ReplayDroppedBlocks reactive.Variable[bool]

// IsEvicted contains a flag indicating if this Commitment was evicted from the Protocol.
Expand Down Expand Up @@ -94,6 +100,8 @@ func newCommitment(commitments *Commitments, model *model.Commitment) *Commitmen
CumulativeAttestedWeight: reactive.NewVariable[uint64](),
IsRoot: reactive.NewEvent(),
IsAttested: reactive.NewEvent(),
IsSynced: reactive.NewEvent(),
IsCommittable: reactive.NewEvent(),
IsVerified: reactive.NewEvent(),
IsAboveLatestVerifiedCommitment: reactive.NewVariable[bool](),
ReplayDroppedBlocks: reactive.NewVariable[bool](),
Expand Down Expand Up @@ -135,6 +143,8 @@ func (c *Commitment) initLogger() (shutdown func()) {
c.CumulativeAttestedWeight.LogUpdates(c, log.LevelTrace, "CumulativeAttestedWeight"),
c.IsRoot.LogUpdates(c, log.LevelTrace, "IsRoot"),
c.IsAttested.LogUpdates(c, log.LevelTrace, "IsAttested"),
c.IsSynced.LogUpdates(c, log.LevelTrace, "IsSynced"),
c.IsCommittable.LogUpdates(c, log.LevelTrace, "IsCommittable"),
c.IsVerified.LogUpdates(c, log.LevelTrace, "IsVerified"),
c.ReplayDroppedBlocks.LogUpdates(c, log.LevelTrace, "ReplayDroppedBlocks"),
c.IsEvicted.LogUpdates(c, log.LevelTrace, "IsEvicted"),
Expand All @@ -149,8 +159,9 @@ func (c *Commitment) initDerivedProperties() (shutdown func()) {
// mark commitments that are marked as root as verified
c.IsVerified.InheritFrom(c.IsRoot),

// mark commitments that are marked as verified as attested
// mark commitments that are marked as verified as attested and synced
c.IsAttested.InheritFrom(c.IsVerified),
c.IsSynced.InheritFrom(c.IsVerified),

c.Parent.WithNonEmptyValue(func(parent *Commitment) func() {
// the weight can be fixed as a one time operation (as it only relies on static information from the parent
Expand All @@ -167,7 +178,11 @@ func (c *Commitment) initDerivedProperties() (shutdown func()) {
c.Chain.WithNonEmptyValue(func(chain *Chain) func() {
return lo.Batch(
c.deriveRequestAttestations(chain, parent),
c.deriveWarpSyncBlocks(chain, parent),

// only start requesting blocks once the engine is ready
chain.WithInitializedEngine(func(_ *engine.Engine) (shutdown func()) {
return c.deriveWarpSyncBlocks(chain, parent)
}),
)
}),
)
Expand Down Expand Up @@ -259,11 +274,11 @@ func (c *Commitment) deriveRequestAttestations(chain *Chain, parent *Commitment)
}

// deriveWarpSyncBlocks derives the WarpSyncBlocks flag of this Commitment which is true if our Chain is requesting
jonastheis marked this conversation as resolved.
Show resolved Hide resolved
// warp sync, and we are the directly above the latest verified Commitment.
// warp sync, and we are the directly above the latest commitment that is synced (has downloaded everything).
func (c *Commitment) deriveWarpSyncBlocks(chain *Chain, parent *Commitment) func() {
return c.WarpSyncBlocks.DeriveValueFrom(reactive.NewDerivedVariable4(func(_ bool, engineInstance *engine.Engine, warpSync bool, parentIsVerified bool, isVerified bool) bool {
return engineInstance != nil && warpSync && parentIsVerified && !isVerified
}, chain.Engine, chain.WarpSyncMode, parent.IsVerified, c.IsVerified))
return c.WarpSyncBlocks.DeriveValueFrom(reactive.NewDerivedVariable3(func(_ bool, warpSyncMode bool, parentIsSynced bool, isSynced bool) bool {
return warpSyncMode && parentIsSynced && !isSynced
}, chain.WarpSyncMode, parent.IsSynced, c.IsSynced))
}

// deriveReplayDroppedBlocks derives the ReplayDroppedBlocks flag of this Commitment which is true if our Chain has an
Expand All @@ -278,7 +293,7 @@ func (c *Commitment) deriveReplayDroppedBlocks(chain *Chain) func() {
// the parent is on the target Chain.
func (c *Commitment) forceChain(targetChain *Chain) {
if currentChain := c.Chain.Get(); currentChain != targetChain {
if parent := c.Parent.Get(); parent.Chain.Get() == targetChain {
if parent := c.Parent.Get(); parent != nil && parent.Chain.Get() == targetChain {
parent.MainChild.Set(c)
}
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/protocol/commitment_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/iotaledger/hive.go/ds"
"github.com/iotaledger/hive.go/ierrors"
"github.com/iotaledger/hive.go/kvstore/mapdb"
"github.com/iotaledger/hive.go/runtime/syncutils"
"github.com/iotaledger/iota-core/pkg/model"
"github.com/iotaledger/iota-core/pkg/protocol/engine"
"github.com/iotaledger/iota-core/pkg/protocol/engine/accounts"
Expand All @@ -22,6 +23,9 @@ type CommitmentVerifier struct {
// validatorAccountsData is the accounts data of the validators for the current epoch as known at lastCommonSlotBeforeFork.
// Initially, it is set to the accounts data of the validators for the epoch of the last common commitment before the fork.
validatorAccountsData map[iotago.AccountID]*accounts.AccountData

// mutex is used to synchronize access to validatorAccountsData and epoch.
mutex syncutils.RWMutex
}

func newCommitmentVerifier(mainEngine *engine.Engine, lastCommonCommitmentBeforeFork *model.Commitment) (*CommitmentVerifier, error) {
Expand Down Expand Up @@ -76,6 +80,7 @@ func (c *CommitmentVerifier) verifyCommitment(commitment *Commitment, attestatio
// This is necessary because the committee might have rotated at the epoch boundary and different validators might be part of it.
// In case anything goes wrong we keep using previously known accounts data (initially set to the accounts data
// of the validators for the epoch of the last common commitment before the fork).
c.mutex.Lock()
apiForSlot := c.engine.APIForSlot(commitment.Slot())
commitmentEpoch := apiForSlot.TimeProvider().EpochFromSlot(commitment.Slot())
if commitmentEpoch > c.epoch {
Expand All @@ -92,6 +97,7 @@ func (c *CommitmentVerifier) verifyCommitment(commitment *Commitment, attestatio
}
}
}
c.mutex.Unlock()

// 3. Verify attestations.
blockIDs, seatCount, err := c.verifyAttestations(attestations)
Expand All @@ -107,6 +113,9 @@ func (c *CommitmentVerifier) verifyCommitment(commitment *Commitment, attestatio
}

func (c *CommitmentVerifier) verifyAttestations(attestations []*iotago.Attestation) (iotago.BlockIDs, uint64, error) {
c.mutex.RLock()
defer c.mutex.RUnlock()

visitedIdentities := ds.NewSet[iotago.AccountID]()
var blockIDs iotago.BlockIDs
var seatCount uint64
Expand Down
12 changes: 6 additions & 6 deletions pkg/protocol/engine/blocks/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type Block struct {
dropped bool

// Notarization
notarized reactive.Variable[bool]
notarized reactive.Event

mutex syncutils.RWMutex

Expand Down Expand Up @@ -88,7 +88,7 @@ func NewBlock(data *model.Block) *Block {
booked: reactive.NewVariable[bool](),
accepted: reactive.NewVariable[bool](),
weightPropagated: reactive.NewVariable[bool](),
notarized: reactive.NewVariable[bool](),
notarized: reactive.NewEvent(),
workScore: data.WorkScore(),
}
}
Expand All @@ -112,7 +112,7 @@ func NewRootBlock(blockID iotago.BlockID, commitmentID iotago.CommitmentID, issu
preAccepted: true,
accepted: reactive.NewVariable[bool](),
weightPropagated: reactive.NewVariable[bool](),
notarized: reactive.NewVariable[bool](),
notarized: reactive.NewEvent(),
scheduled: true,
}

Expand Down Expand Up @@ -140,7 +140,7 @@ func NewMissingBlock(blockID iotago.BlockID) *Block {
booked: reactive.NewVariable[bool](),
accepted: reactive.NewVariable[bool](),
weightPropagated: reactive.NewVariable[bool](),
notarized: reactive.NewVariable[bool](),
notarized: reactive.NewEvent(),
}
}

Expand Down Expand Up @@ -622,7 +622,7 @@ func (b *Block) SetWeightPropagated() (wasUpdated bool) {
return !b.weightPropagated.Set(true)
}

func (b *Block) Notarized() reactive.Variable[bool] {
func (b *Block) Notarized() reactive.Event {
return b.notarized
}

Expand All @@ -631,7 +631,7 @@ func (b *Block) IsNotarized() (isBooked bool) {
}

func (b *Block) SetNotarized() (wasUpdated bool) {
return !b.notarized.Set(true)
return b.notarized.Trigger()
}

func (b *Block) String() string {
Expand Down
12 changes: 6 additions & 6 deletions pkg/protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func New(logger log.Logger, workers *workerpool.Group, networkEndpoint network.E

p.Constructed.Trigger()

p.waitMainEngineInitialized()
p.waitInitialized()
})
}

Expand Down Expand Up @@ -204,14 +204,14 @@ func (p *Protocol) initNetwork() (shutdown func()) {
)
}

// waitMainEngineInitialized waits until the main engine is initialized.
func (p *Protocol) waitMainEngineInitialized() {
// waitInitialized waits until the main engine is initialized (published its root commitment).
func (p *Protocol) waitInitialized() {
var waitInitialized sync.WaitGroup

waitInitialized.Add(1)
p.Engines.Main.OnUpdateOnce(func(_ *engine.Engine, engine *engine.Engine) {
engine.Initialized.OnTrigger(waitInitialized.Done)
})
p.Commitments.Root.OnUpdateOnce(func(_ *Commitment, _ *Commitment) {
waitInitialized.Done()
}, func(_ *Commitment, rootCommitment *Commitment) bool { return rootCommitment != nil })

waitInitialized.Wait()
}
2 changes: 1 addition & 1 deletion pkg/protocol/protocol_attestations.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type AttestationsProtocol struct {
// commitmentVerifiers contains the commitment verifiers that are used to verify received attestations.
commitmentVerifiers *shrinkingmap.ShrinkingMap[iotago.CommitmentID, *CommitmentVerifier]

// Logger embeds a logger that can be used to log messages emitted by this chain.
// Logger embeds a logger that can be used to log messages emitted by this component.
log.Logger
}

Expand Down
21 changes: 6 additions & 15 deletions pkg/protocol/protocol_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,21 +53,12 @@ func newBlocksProtocol(protocol *Protocol) *BlocksProtocol {
})
})

protocol.Chains.WithElements(func(chain *Chain) func() {
return chain.Engine.WithNonEmptyValue(func(engineInstance *engine.Engine) (shutdown func()) {
return engineInstance.Events.BlockRequester.Tick.Hook(b.SendRequest).Unhook
})
})

protocol.Chains.Main.Get().Engine.OnUpdateWithContext(func(_ *engine.Engine, engine *engine.Engine, unsubscribeOnEngineChange func(subscriptionFactory func() (unsubscribe func()))) {
if engine != nil {
unsubscribeOnEngineChange(func() (unsubscribe func()) {
return lo.Batch(
engine.Events.Scheduler.BlockScheduled.Hook(func(block *blocks.Block) { b.SendResponse(block.ModelBlock()) }).Unhook,
engine.Events.Scheduler.BlockSkipped.Hook(func(block *blocks.Block) { b.SendResponse(block.ModelBlock()) }).Unhook,
)
})
}
protocol.Chains.WithInitializedEngines(func(chain *Chain, engine *engine.Engine) (shutdown func()) {
return lo.Batch(
engine.Events.BlockRequester.Tick.Hook(b.SendRequest).Unhook,
engine.Events.Scheduler.BlockScheduled.Hook(func(block *blocks.Block) { b.SendResponse(block.ModelBlock()) }).Unhook,
jonastheis marked this conversation as resolved.
Show resolved Hide resolved
engine.Events.Scheduler.BlockSkipped.Hook(func(block *blocks.Block) { b.SendResponse(block.ModelBlock()) }).Unhook,
)
})
})

Expand Down
Loading
Loading