From f5606140358b729b4e3a1d174a4dfd6928e1ecdb Mon Sep 17 00:00:00 2001 From: Hans Moog <3293976+hmoog@users.noreply.github.com> Date: Mon, 30 Oct 2023 01:10:49 +0100 Subject: [PATCH] Fix: fix more stuff --- pkg/protocol/chain.go | 12 +++-- pkg/protocol/commitment.go | 92 +++++++++++++++++++++------------ pkg/protocol/protocol_blocks.go | 12 ++--- 3 files changed, 71 insertions(+), 45 deletions(-) diff --git a/pkg/protocol/chain.go b/pkg/protocol/chain.go index 499d6b0e9..7f9edc09f 100644 --- a/pkg/protocol/chain.go +++ b/pkg/protocol/chain.go @@ -170,11 +170,13 @@ func (c *Chain) initClaimedWeight() { func (c *Chain) initAttestedWeight() { c.LatestAttestedCommitment.OnUpdateWithContext(func(_, latestAttestedCommitment *Commitment, unsubscribeOnUpdate func(subscriptionFactory func() (unsubscribe func()))) { - setupInheritance := func() func() { - return c.AttestedWeight.InheritFrom(latestAttestedCommitment.CumulativeAttestedWeight) - } + if latestAttestedCommitment != nil { + setupInheritance := func() func() { + return c.AttestedWeight.InheritFrom(latestAttestedCommitment.CumulativeAttestedWeight) + } - unsubscribeOnUpdate(setupInheritance) + unsubscribeOnUpdate(setupInheritance) + } }) } @@ -270,7 +272,7 @@ func (c *Chain) DispatchBlock(block *model.Block, src peer.ID) (success bool) { for _, chain := range append([]*Chain{c}, c.ChildChains.ToSlice()...) { if chain.VerifyState.Get() { - if targetEngine := chain.SpawnedEngine.Get(); targetEngine != nil && !chain.WarpSync.Get() || targetEngine.BlockRequester.HasTicker(block.ID()) { + if targetEngine := chain.Engine.Get(); targetEngine != nil && !chain.WarpSync.Get() || targetEngine.BlockRequester.HasTicker(block.ID()) { targetEngine.ProcessBlockFromPeer(block, src) success = true diff --git a/pkg/protocol/commitment.go b/pkg/protocol/commitment.go index 1fe0b6746..b7e6a66ba 100644 --- a/pkg/protocol/commitment.go +++ b/pkg/protocol/commitment.go @@ -14,23 +14,25 @@ import ( type Commitment struct { *model.Commitment - Parent reactive.Variable[*Commitment] - Children reactive.Set[*Commitment] - MainChild reactive.Variable[*Commitment] - SpawnedChain reactive.Variable[*Chain] - Chain reactive.Variable[*Chain] - Engine reactive.Variable[*engine.Engine] - RequestAttestations reactive.Variable[bool] - RequestBlocks reactive.Variable[bool] - RequestedBlocksReceived reactive.Variable[bool] - Weight reactive.Variable[uint64] - AttestedWeight reactive.Variable[uint64] - CumulativeAttestedWeight reactive.Variable[uint64] - IsSolid reactive.Event - IsAttested reactive.Event - IsVerified reactive.Event - IsRoot reactive.Event - IsEvicted reactive.Event + Parent reactive.Variable[*Commitment] + Children reactive.Set[*Commitment] + MainChild reactive.Variable[*Commitment] + SpawnedChain reactive.Variable[*Chain] + Chain reactive.Variable[*Chain] + Engine reactive.Variable[*engine.Engine] + RequestAttestations reactive.Variable[bool] + RequestBlocks reactive.Variable[bool] + RequestedBlocksReceived reactive.Variable[bool] + Weight reactive.Variable[uint64] + AttestedWeight reactive.Variable[uint64] + CumulativeAttestedWeight reactive.Variable[uint64] + IsSolid reactive.Event + IsAttested reactive.Event + IsVerified reactive.Event + IsRoot reactive.Event + IsEvicted reactive.Event + IsAboveLatestVerifiedCommitment reactive.Variable[bool] + ReplayBlocks reactive.Variable[bool] protocol *Protocol isDirectlyAboveLatestAttestedCommitment reactive.Variable[bool] @@ -45,23 +47,25 @@ func NewCommitment(commitment *model.Commitment, protocol *Protocol) *Commitment c := &Commitment{ Commitment: commitment, - Parent: reactive.NewVariable[*Commitment](), - MainChild: reactive.NewVariable[*Commitment](), - Children: reactive.NewSet[*Commitment](), - SpawnedChain: reactive.NewVariable[*Chain](), - Chain: reactive.NewVariable[*Chain](), - Engine: reactive.NewVariable[*engine.Engine](), - RequestAttestations: reactive.NewVariable[bool](), - RequestBlocks: reactive.NewVariable[bool](), - RequestedBlocksReceived: reactive.NewVariable[bool](), - Weight: reactive.NewVariable[uint64](), - AttestedWeight: reactive.NewVariable[uint64](func(currentValue uint64, newValue uint64) uint64 { return max(currentValue, newValue) }), - CumulativeAttestedWeight: reactive.NewVariable[uint64](), - IsSolid: reactive.NewEvent(), - IsAttested: reactive.NewEvent(), - IsVerified: reactive.NewEvent(), - IsRoot: reactive.NewEvent(), - IsEvicted: reactive.NewEvent(), + Parent: reactive.NewVariable[*Commitment](), + MainChild: reactive.NewVariable[*Commitment](), + Children: reactive.NewSet[*Commitment](), + SpawnedChain: reactive.NewVariable[*Chain](), + Chain: reactive.NewVariable[*Chain](), + Engine: reactive.NewVariable[*engine.Engine](), + RequestAttestations: reactive.NewVariable[bool](), + RequestBlocks: reactive.NewVariable[bool](), + RequestedBlocksReceived: reactive.NewVariable[bool](), + Weight: reactive.NewVariable[uint64](), + AttestedWeight: reactive.NewVariable[uint64](func(currentValue uint64, newValue uint64) uint64 { return max(currentValue, newValue) }), + CumulativeAttestedWeight: reactive.NewVariable[uint64](), + IsSolid: reactive.NewEvent(), + IsAttested: reactive.NewEvent(), + IsVerified: reactive.NewEvent(), + IsRoot: reactive.NewEvent(), + IsEvicted: reactive.NewEvent(), + IsAboveLatestVerifiedCommitment: reactive.NewVariable[bool](), + ReplayBlocks: reactive.NewVariable[bool](), protocol: protocol, isDirectlyAboveLatestAttestedCommitment: reactive.NewVariable[bool](), @@ -70,6 +74,26 @@ func NewCommitment(commitment *model.Commitment, protocol *Protocol) *Commitment isBelowWarpSyncThreshold: reactive.NewEvent(), } + c.Parent.OnUpdateWithContext(func(_, parent *Commitment, unsubscribeOnUpdate func(subscriptionFactory func() (unsubscribe func()))) { + if parent != nil { + unsubscribeOnUpdate(func() func() { + return c.IsAboveLatestVerifiedCommitment.InheritFrom(reactive.NewDerivedVariable2(func(parentAboveLatestVerifiedCommitment, directlyAboveLatestVerifiedCommitment bool) bool { + return parentAboveLatestVerifiedCommitment || directlyAboveLatestVerifiedCommitment + }, parent.IsAboveLatestVerifiedCommitment, c.isDirectlyAboveLatestVerifiedCommitment)) + }) + } + }) + + c.Chain.OnUpdateWithContext(func(_, chain *Chain, unsubscribeOnUpdate func(subscriptionFactory func() (unsubscribe func()))) { + if chain != nil { + unsubscribeOnUpdate(func() func() { + return c.ReplayBlocks.InheritFrom(reactive.NewDerivedVariable3(func(spawnedEngine *engine.Engine, warpSyncing, isAboveLatestVerifiedCommitment bool) bool { + return spawnedEngine != nil && !warpSyncing && isAboveLatestVerifiedCommitment + }, chain.SpawnedEngine, chain.WarpSync, c.IsAboveLatestVerifiedCommitment)) + }) + } + }) + c.Parent.OnUpdateOnce(func(_, parent *Commitment) { parent.registerChild(c) diff --git a/pkg/protocol/protocol_blocks.go b/pkg/protocol/protocol_blocks.go index 748657a18..12a30335e 100644 --- a/pkg/protocol/protocol_blocks.go +++ b/pkg/protocol/protocol_blocks.go @@ -33,13 +33,13 @@ func NewBlocksProtocol(protocol *Protocol) *BlocksProtocol { protocol.Constructed.OnTrigger(func() { protocol.CommitmentCreated.Hook(func(commitment *Commitment) { - commitment.isDirectlyAboveLatestVerifiedCommitment.OnUpdate(func(_, isDirectlyAboveLatestVerifiedCommitment bool) { - if !isDirectlyAboveLatestVerifiedCommitment { - return - } + commitment.ReplayBlocks.OnUpdate(func(_, replayBlocks bool) { + if replayBlocks { + b.LogDebug("replaying blocks", "commitmentID", commitment.ID()) - for _, droppedBlock := range b.droppedBlocksBuffer.GetValues(commitment.ID()) { - b.ProcessResponse(droppedBlock.A, droppedBlock.B) + for _, droppedBlock := range b.droppedBlocksBuffer.GetValues(commitment.ID()) { + b.ProcessResponse(droppedBlock.A, droppedBlock.B) + } } }) })