From 647d023facfacda19a34a7a25e8947e634a8b7fa Mon Sep 17 00:00:00 2001 From: Hans Moog <3293976+hmoog@users.noreply.github.com> Date: Mon, 30 Oct 2023 20:45:15 +0100 Subject: [PATCH] Feat: started experimenting with definitions framework --- pkg/protocol/commitment.go | 204 ++++++++++++++++++++--------- pkg/protocol/protocol_blocks.go | 2 +- pkg/protocol/protocol_warp_sync.go | 4 +- 3 files changed, 148 insertions(+), 62 deletions(-) diff --git a/pkg/protocol/commitment.go b/pkg/protocol/commitment.go index 0ccd19379..b877c6407 100644 --- a/pkg/protocol/commitment.go +++ b/pkg/protocol/commitment.go @@ -11,8 +11,6 @@ import ( ) type Commitment struct { - *model.Commitment - Parent reactive.Variable[*Commitment] Children reactive.Set[*Commitment] MainChild reactive.Variable[*Commitment] @@ -20,7 +18,7 @@ type Commitment struct { Chain reactive.Variable[*Chain] Engine reactive.Variable[*engine.Engine] RequestAttestations reactive.Variable[bool] - RequestBlocks reactive.Variable[bool] + WarpSync reactive.Variable[bool] RequestedBlocksReceived reactive.Variable[bool] Weight reactive.Variable[uint64] AttestedWeight reactive.Variable[uint64] @@ -31,19 +29,18 @@ type Commitment struct { IsRoot reactive.Event IsEvicted reactive.Event IsAboveLatestVerifiedCommitment reactive.Variable[bool] - ReplayBlocks reactive.Variable[bool] + InSyncRange reactive.Variable[bool] protocol *Protocol isDirectlyAboveLatestAttestedCommitment reactive.Variable[bool] - isDirectlyAboveLatestVerifiedCommitment reactive.Variable[bool] + *model.Commitment log.Logger } func NewCommitment(commitment *model.Commitment, protocol *Protocol) *Commitment { c := &Commitment{ - Commitment: commitment, - + Commitment: commitment, Parent: reactive.NewVariable[*Commitment](), MainChild: reactive.NewVariable[*Commitment](), Children: reactive.NewSet[*Commitment](), @@ -51,7 +48,7 @@ func NewCommitment(commitment *model.Commitment, protocol *Protocol) *Commitment Chain: reactive.NewVariable[*Chain](), Engine: reactive.NewVariable[*engine.Engine](), RequestAttestations: reactive.NewVariable[bool](), - RequestBlocks: reactive.NewVariable[bool](), + WarpSync: reactive.NewVariable[bool](), RequestedBlocksReceived: reactive.NewVariable[bool](), Weight: reactive.NewVariable[uint64](), AttestedWeight: reactive.NewVariable[uint64](func(currentValue uint64, newValue uint64) uint64 { return max(currentValue, newValue) }), @@ -62,63 +59,38 @@ func NewCommitment(commitment *model.Commitment, protocol *Protocol) *Commitment IsRoot: reactive.NewEvent(), IsEvicted: reactive.NewEvent(), IsAboveLatestVerifiedCommitment: reactive.NewVariable[bool](), - ReplayBlocks: reactive.NewVariable[bool](), + InSyncRange: reactive.NewVariable[bool](), protocol: protocol, isDirectlyAboveLatestAttestedCommitment: reactive.NewVariable[bool](), - isDirectlyAboveLatestVerifiedCommitment: reactive.NewVariable[bool](), } - c.Parent.OnUpdateWithContext(func(_, parent *Commitment, unsubscribeOnUpdate func(subscriptionFactory func() (unsubscribe func()))) { - if parent != nil { - unsubscribeOnUpdate(func() func() { - return c.IsAboveLatestVerifiedCommitment.InheritFrom(reactive.NewDerivedVariable2(func(parentAboveLatestVerifiedCommitment, directlyAboveLatestVerifiedCommitment bool) bool { - return parentAboveLatestVerifiedCommitment || directlyAboveLatestVerifiedCommitment - }, parent.IsAboveLatestVerifiedCommitment, c.isDirectlyAboveLatestVerifiedCommitment)) - }) - } - }) - - c.Chain.OnUpdateWithContext(func(_, chain *Chain, unsubscribeOnUpdate func(subscriptionFactory func() (unsubscribe func()))) { - if chain != nil { - unsubscribeOnUpdate(func() func() { - return c.ReplayBlocks.InheritFrom(reactive.NewDerivedVariable3(func(spawnedEngine *engine.Engine, warpSyncing, isAboveLatestVerifiedCommitment bool) bool { - return spawnedEngine != nil && !warpSyncing && isAboveLatestVerifiedCommitment - }, chain.SpawnedEngine, chain.WarpSync, c.IsAboveLatestVerifiedCommitment)) - }) - } - }) - - c.Parent.OnUpdateOnce(func(_, parent *Commitment) { - parent.registerChild(c) - - c.Chain.InheritFrom(reactive.NewDerivedVariable2(func(parentChain, spawnedChain *Chain) *Chain { - if spawnedChain != nil { - return spawnedChain - } + WithDependency(c.Parent, + DynamicValueWithDependency(c.chain, c.Chain), + DynamicValueWithDependency(c.isSolid, c.IsSolid), + DynamicValueWithDependency(c.cumulativeAttestedWeight, c.CumulativeAttestedWeight), + DynamicValueWithDependency(c.isAboveLatestVerifiedCommitment, c.IsAboveLatestVerifiedCommitment), + DefinitionWithStaticValue(c.weight, c.Weight), + ) - return parentChain - }, parent.Chain, c.SpawnedChain)) + WithDependency(c.Chain, + DynamicValueWithDependency(c.inSyncRange, c.InSyncRange), + ) - c.IsSolid.InheritFrom(parent.IsSolid) + WithDependencies2(c.Parent, c.Chain, + DynamicValueWith2Dependencies(c.warpSync, c.WarpSync), + ) - c.Weight.Set(c.CumulativeWeight() - parent.CumulativeWeight()) + c.Parent.OnUpdateOnce(func(_, parent *Commitment) { + if parent == nil { + return + } - c.IsAttested.OnTrigger(func() { - parent.IsAttested.OnTrigger(func() { - c.CumulativeAttestedWeight.InheritFrom(reactive.NewDerivedVariable2(func(parentCumulativeAttestedWeight, attestedWeight uint64) uint64 { - return parentCumulativeAttestedWeight + attestedWeight - }, parent.CumulativeAttestedWeight, c.AttestedWeight)) - }) - }) + parent.registerChild(c) c.isDirectlyAboveLatestAttestedCommitment.InheritFrom(reactive.NewDerivedVariable2(func(parentIsAttested, isAttested bool) bool { return parentIsAttested && !isAttested }, parent.IsAttested, c.IsAttested)) - - c.isDirectlyAboveLatestVerifiedCommitment.InheritFrom(reactive.NewDerivedVariable2(func(parentIsVerified, isVerified bool) bool { - return parentIsVerified && !isVerified - }, parent.IsVerified, c.IsVerified)) }) c.Chain.OnUpdateWithContext(func(_, chain *Chain, withinContext func(subscriptionFactory func() (unsubscribe func()))) { @@ -138,24 +110,18 @@ func NewCommitment(commitment *model.Commitment, protocol *Protocol) *Commitment c.Engine.InheritFrom(chain.Engine), - c.RequestBlocks.InheritFrom(reactive.NewDerivedVariable3(func(spawnedEngine *engine.Engine, warpSyncChain, isDirectlyAboveLatestVerifiedCommitment bool) bool { - return spawnedEngine != nil && warpSyncChain && isDirectlyAboveLatestVerifiedCommitment - }, chain.SpawnedEngine, chain.WarpSync, c.isDirectlyAboveLatestVerifiedCommitment)), - requestAttestations.Unsubscribe, ) }) }) c.IsRoot.OnTrigger(func() { - c.IsSolid.Set(true) c.IsAttested.Set(true) c.IsVerified.Set(true) }) c.Logger = protocol.NewEntityLogger(fmt.Sprintf("Slot%d.", commitment.Slot()), c.IsEvicted, func(entityLogger log.Logger) { - c.isDirectlyAboveLatestVerifiedCommitment.LogUpdates(entityLogger, log.LevelTrace, "isDirectlyAboveLatestVerifiedCommitment") - c.RequestBlocks.LogUpdates(entityLogger, log.LevelTrace, "RequestBlocks") + c.WarpSync.LogUpdates(entityLogger, log.LevelTrace, "WarpSync") c.Weight.LogUpdates(entityLogger, log.LevelTrace, "Weight") c.AttestedWeight.LogUpdates(entityLogger, log.LevelTrace, "AttestedWeight") c.CumulativeAttestedWeight.LogUpdates(entityLogger, log.LevelTrace, "CumulativeAttestedWeight") @@ -164,6 +130,54 @@ func NewCommitment(commitment *model.Commitment, protocol *Protocol) *Commitment return c } +func (c *Commitment) isAboveLatestVerifiedCommitment(parent *Commitment) reactive.DerivedVariable[bool] { + return reactive.NewDerivedVariable3(func(parentAboveLatestVerifiedCommitment, parentIsVerified, isVerified bool) bool { + return parentAboveLatestVerifiedCommitment || (parentIsVerified && isVerified) + }, parent.IsAboveLatestVerifiedCommitment, parent.IsVerified, c.IsVerified) +} + +func (c *Commitment) inSyncRange(chain *Chain) reactive.DerivedVariable[bool] { + return reactive.NewDerivedVariable3(func(spawnedEngine *engine.Engine, warpSyncing, isAboveLatestVerifiedCommitment bool) bool { + return spawnedEngine != nil && !warpSyncing && isAboveLatestVerifiedCommitment + }, chain.SpawnedEngine, chain.WarpSync, c.IsAboveLatestVerifiedCommitment) +} + +func (c *Commitment) chain(parent *Commitment) reactive.DerivedVariable[*Chain] { + return reactive.NewDerivedVariable2(func(parentChain, spawnedChain *Chain) *Chain { + if spawnedChain != nil { + return spawnedChain + } + + return parentChain + }, parent.Chain, c.SpawnedChain) +} + +func (c *Commitment) cumulativeAttestedWeight(parent *Commitment) reactive.DerivedVariable[uint64] { + return reactive.NewDerivedVariable3(func(isAttested bool, parentCumulativeAttestedWeight, attestedWeight uint64) uint64 { + if !isAttested { + return 0 + } + + return parentCumulativeAttestedWeight + attestedWeight + }, c.IsAttested, parent.CumulativeAttestedWeight, c.AttestedWeight) +} + +func (c *Commitment) isSolid(parent *Commitment) reactive.DerivedVariable[bool] { + return reactive.NewDerivedVariable2(func(isRoot, parentIsSolid bool) bool { + return isRoot || parentIsSolid + }, c.IsRoot, parent.IsSolid) +} + +func (c *Commitment) warpSync(parent *Commitment, chain *Chain) reactive.DerivedVariable[bool] { + return reactive.NewDerivedVariable4(func(spawnedEngine *engine.Engine, warpSync, parentIsVerified, isVerified bool) bool { + return spawnedEngine != nil && warpSync && parentIsVerified && !isVerified + }, chain.SpawnedEngine, chain.WarpSync, parent.IsVerified, c.IsVerified) +} + +func (c *Commitment) weight(parent *Commitment) uint64 { + return c.CumulativeWeight() - parent.CumulativeWeight() +} + func (c *Commitment) registerChild(child *Commitment) { if c.Children.Add(child) { c.MainChild.Compute(func(currentMainChild *Commitment) *Commitment { @@ -233,3 +247,75 @@ func (c *Commitment) promote(targetChain *Chain) { } } } + +func WithDependency[S comparable](source reactive.Variable[S], dependencyReceiver ...func(S) func()) (unsubscribe func()) { + unsubscribeAll := make([]func(), 0) + + return source.OnUpdateWithContext(func(_, parent S, unsubscribeOnParentUpdate func(subscriptionFactory func() (unsubscribe func()))) { + if parent == *new(S) { + return + } + + unsubscribeOnParentUpdate(func() (unsubscribe func()) { + for _, dependency := range dependencyReceiver { + if unsubscribeDependency := dependency(parent); unsubscribeDependency != nil { + unsubscribeAll = append(unsubscribeAll, unsubscribeDependency) + } + } + + return lo.Batch(unsubscribeAll...) + }) + }) +} + +func WithDependencies2[S1, S2 comparable](source1 reactive.Variable[S1], source2 reactive.Variable[S2], dependencyReceiver ...func(S1, S2) func()) (unsubscribe func()) { + unsubscribeAll := make([]func(), 0) + + return source1.OnUpdateWithContext(func(_, source1 S1, unsubscribeOnParentUpdate func(subscriptionFactory func() (unsubscribe func()))) { + if source1 == *new(S1) { + return + } + + unsubscribeOnParentUpdate(func() (unsubscribe func()) { + return source2.OnUpdateWithContext(func(_, source2 S2, unsubscribeOnParentUpdate func(subscriptionFactory func() (unsubscribe func()))) { + if source2 == *new(S2) { + return + } + + unsubscribeOnParentUpdate(func() (unsubscribe func()) { + for _, dependency := range dependencyReceiver { + if unsubscribeDependency := dependency(source1, source2); unsubscribeDependency != nil { + unsubscribeAll = append(unsubscribeAll, unsubscribeDependency) + } + } + + return lo.Batch(unsubscribeAll...) + }) + }) + }) + }) +} + +func DynamicValueWithDependency[T, S comparable](definition func(S) reactive.DerivedVariable[T], target reactive.Variable[T]) func(parent S) func() { + return func(parent S) func() { + derivedVariable := definition(parent) + + return lo.Batch(target.InheritFrom(derivedVariable), derivedVariable.Unsubscribe) + } +} + +func DynamicValueWith2Dependencies[T, S1, S2 comparable](definition func(S1, S2) reactive.DerivedVariable[T], target reactive.Variable[T]) func(S1, S2) func() { + return func(source1 S1, source2 S2) func() { + derivedVariable := definition(source1, source2) + + return lo.Batch(target.InheritFrom(derivedVariable), derivedVariable.Unsubscribe) + } +} + +func DefinitionWithStaticValue[T, S comparable](definition func(S) T, target reactive.Variable[T]) func(parent S) func() { + return func(parent S) func() { + target.Set(definition(parent)) + + return nil + } +} diff --git a/pkg/protocol/protocol_blocks.go b/pkg/protocol/protocol_blocks.go index 12a30335e..8bdab8c79 100644 --- a/pkg/protocol/protocol_blocks.go +++ b/pkg/protocol/protocol_blocks.go @@ -33,7 +33,7 @@ func NewBlocksProtocol(protocol *Protocol) *BlocksProtocol { protocol.Constructed.OnTrigger(func() { protocol.CommitmentCreated.Hook(func(commitment *Commitment) { - commitment.ReplayBlocks.OnUpdate(func(_, replayBlocks bool) { + commitment.InSyncRange.OnUpdate(func(_, replayBlocks bool) { if replayBlocks { b.LogDebug("replaying blocks", "commitmentID", commitment.ID()) diff --git a/pkg/protocol/protocol_warp_sync.go b/pkg/protocol/protocol_warp_sync.go index a0a938317..96b18b252 100644 --- a/pkg/protocol/protocol_warp_sync.go +++ b/pkg/protocol/protocol_warp_sync.go @@ -36,7 +36,7 @@ func NewWarpSyncProtocol(protocol *Protocol) *WarpSyncProtocol { protocol.Constructed.OnTrigger(func() { c.protocol.CommitmentCreated.Hook(func(commitment *Commitment) { - commitment.RequestBlocks.OnUpdate(func(_, warpSyncBlocks bool) { + commitment.WarpSync.OnUpdate(func(_, warpSyncBlocks bool) { if warpSyncBlocks { c.ticker.StartTicker(commitment.ID()) } else { @@ -101,7 +101,7 @@ func (w *WarpSyncProtocol) ProcessResponse(commitmentID iotago.CommitmentID, blo } commitment.RequestedBlocksReceived.Compute(func(requestedBlocksReceived bool) bool { - if requestedBlocksReceived || !commitment.RequestBlocks.Get() { + if requestedBlocksReceived || !commitment.WarpSync.Get() { w.LogTrace("response for already synced commitment", "commitment", commitment.LogName(), "fromPeer", from) return requestedBlocksReceived