Skip to content

Commit

Permalink
Feat: factored out eviction
Browse files Browse the repository at this point in the history
  • Loading branch information
hmoog committed Aug 21, 2023
1 parent 26caa3e commit 33436b4
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 76 deletions.
20 changes: 20 additions & 0 deletions pkg/protocol/chainmanagerv1/chain.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package chainmanagerv1

import (
"fmt"

"github.com/iotaledger/hive.go/ds/reactive"
"github.com/iotaledger/hive.go/ds/shrinkingmap"
"github.com/iotaledger/hive.go/lo"
Expand Down Expand Up @@ -34,6 +36,8 @@ type Chain struct {
// warpSyncThreshold defines a lower bound for where the warp sync process starts (to not requests slots that we are
// about to commit ourselves once we are in sync).
warpSyncThreshold reactive.Variable[iotago.SlotIndex]

cumulativeWeight reactive.Variable[uint64]
}

func NewChain(forkingPoint *CommitmentMetadata) *Chain {
Expand All @@ -55,9 +59,25 @@ func NewChain(forkingPoint *CommitmentMetadata) *Chain {
return latestCommitmentIndex - WarpSyncOffset
}, c.latestCommitmentIndex)

c.cumulativeWeight = reactive.NewDerivedVariable[uint64](func(latestCommitmentIndex iotago.SlotIndex) uint64 {
if latestCommitment, exists := c.commitments.Get(latestCommitmentIndex); exists {
return latestCommitment.CumulativeWeight()
} else {
panic(fmt.Sprintf("latest commitment with index %d does not exist", latestCommitmentIndex))
}
}, c.latestCommitmentIndex)

return c
}

func (c *Chain) ForkingPoint() reactive.Variable[*CommitmentMetadata] {
return c.forkingPoint
}

func (c *Chain) LatestCommitmentIndex() reactive.Variable[iotago.SlotIndex] {
return c.latestCommitmentIndex
}

func (c *Chain) LatestVerifiedCommitmentIndex() reactive.Variable[iotago.SlotIndex] {
return c.latestVerifiedCommitmentIndex
}
Expand Down
96 changes: 25 additions & 71 deletions pkg/protocol/chainmanagerv1/chainmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,34 @@ type ChainManager struct {

commitmentRequester *eventticker.EventTicker[iotago.SlotIndex, iotago.CommitmentID]

slotEvictionEvents *shrinkingmap.ShrinkingMap[iotago.SlotIndex, reactive.Event]

lastEvictedSlotIndex reactive.Variable[iotago.SlotIndex]
*SlotEviction
}

func NewChainManager() *ChainManager {
return &ChainManager{
rootCommitment: reactive.NewVariable[*CommitmentMetadata](),
commitmentCreated: event.New1[*CommitmentMetadata](),
cachedCommitments: shrinkingmap.New[iotago.CommitmentID, *promise.Promise[*CommitmentMetadata]](),
commitmentRequester: eventticker.New[iotago.SlotIndex, iotago.CommitmentID](),
slotEvictionEvents: shrinkingmap.New[iotago.SlotIndex, reactive.Event](),
lastEvictedSlotIndex: reactive.NewVariable[iotago.SlotIndex](),
rootCommitment: reactive.NewVariable[*CommitmentMetadata](),
commitmentCreated: event.New1[*CommitmentMetadata](),
cachedCommitments: shrinkingmap.New[iotago.CommitmentID, *promise.Promise[*CommitmentMetadata]](),
commitmentRequester: eventticker.New[iotago.SlotIndex, iotago.CommitmentID](),

SlotEviction: NewSlotEviction(),
}
}

func (c *ChainManager) ProcessCommitment(commitment *model.Commitment) (commitmentMetadata *CommitmentMetadata) {
if commitmentRequest, _ := c.requestCommitment(commitment.ID(), commitment.Index(), false, func(resolvedMetadata *CommitmentMetadata) {
commitmentMetadata = resolvedMetadata
}); commitmentRequest != nil {
commitmentRequest.Resolve(NewCommitmentMetadata(commitment))
}

return commitmentMetadata
}

func (c *ChainManager) OnCommitmentCreated(callback func(commitment *CommitmentMetadata)) (unsubscribe func()) {
return c.commitmentCreated.Hook(callback).Unhook
}

func (c *ChainManager) SetRootCommitment(commitment *model.Commitment) (commitmentMetadata *CommitmentMetadata, err error) {
c.rootCommitment.Compute(func(currentRoot *CommitmentMetadata) *CommitmentMetadata {
if currentRoot != nil {
Expand Down Expand Up @@ -74,58 +86,6 @@ func (c *ChainManager) SetRootCommitment(commitment *model.Commitment) (commitme
return commitmentMetadata, nil
}

func (c *ChainManager) ProcessCommitment(commitment *model.Commitment) (commitmentMetadata *CommitmentMetadata) {
if commitmentRequest, _ := c.requestCommitment(commitment.ID(), commitment.Index(), false, func(resolvedMetadata *CommitmentMetadata) {
commitmentMetadata = resolvedMetadata
}); commitmentRequest != nil {
commitmentRequest.Resolve(NewCommitmentMetadata(commitment))
}

return commitmentMetadata
}

func (c *ChainManager) OnCommitmentCreated(callback func(commitment *CommitmentMetadata)) (unsubscribe func()) {
return c.commitmentCreated.Hook(callback).Unhook
}

func (c *ChainManager) SlotEvictedEvent(index iotago.SlotIndex) reactive.Event {
var slotEvictedEvent reactive.Event

c.lastEvictedSlotIndex.Compute(func(lastEvictedSlotIndex iotago.SlotIndex) iotago.SlotIndex {
if index > lastEvictedSlotIndex {
slotEvictedEvent, _ = c.slotEvictionEvents.GetOrCreate(index, reactive.NewEvent)
} else {
slotEvictedEvent = defaultTriggeredEvent
}

return lastEvictedSlotIndex
})

return slotEvictedEvent
}

func (c *ChainManager) Evict(slotIndex iotago.SlotIndex) {
slotEvictedEventsToTrigger := make([]reactive.Event, 0)

c.lastEvictedSlotIndex.Compute(func(lastEvictedSlotIndex iotago.SlotIndex) iotago.SlotIndex {
if slotIndex <= lastEvictedSlotIndex {
return lastEvictedSlotIndex
}

for i := lastEvictedSlotIndex + 1; i <= slotIndex; i++ {
if slotEvictedEvent, exists := c.slotEvictionEvents.Get(i); exists {
slotEvictedEventsToTrigger = append(slotEvictedEventsToTrigger, slotEvictedEvent)
}
}

return slotIndex
})

for _, slotEvictedEvent := range slotEvictedEventsToTrigger {
slotEvictedEvent.Trigger()
}
}

func (c *ChainManager) setupCommitment(commitment *CommitmentMetadata, slotEvictedEvent reactive.Event) {
c.requestCommitment(commitment.PrevID(), commitment.Index()-1, true, commitment.registerParent)

Expand All @@ -135,8 +95,8 @@ func (c *ChainManager) setupCommitment(commitment *CommitmentMetadata, slotEvict
}

func (c *ChainManager) requestCommitment(id iotago.CommitmentID, index iotago.SlotIndex, requestIfMissing bool, optSuccessCallbacks ...func(metadata *CommitmentMetadata)) (commitmentRequest *promise.Promise[*CommitmentMetadata], requestCreated bool) {
slotEvictedEvent := c.SlotEvictedEvent(index)
if slotEvictedEvent.WasTriggered() {
slotEvicted := c.EvictedEvent(index)
if slotEvicted.WasTriggered() {
if rootCommitment := c.rootCommitment.Get(); rootCommitment != nil && id == rootCommitment.ID() {
for _, successCallback := range optSuccessCallbacks {
successCallback(rootCommitment)
Expand All @@ -158,10 +118,10 @@ func (c *ChainManager) requestCommitment(id iotago.CommitmentID, index iotago.Sl
c.commitmentRequester.StopTicker(commitment.ID())
}

c.setupCommitment(commitment, slotEvictedEvent)
c.setupCommitment(commitment, slotEvicted)
})

slotEvictedEvent.OnTrigger(func() { c.cachedCommitments.Delete(id) })
slotEvicted.OnTrigger(func() { c.cachedCommitments.Delete(id) })
}

for _, successCallback := range optSuccessCallbacks {
Expand All @@ -170,9 +130,3 @@ func (c *ChainManager) requestCommitment(id iotago.CommitmentID, index iotago.Sl

return commitmentRequest, requestCreated
}

var defaultTriggeredEvent = reactive.NewEvent()

func init() {
defaultTriggeredEvent.Trigger()
}
3 changes: 2 additions & 1 deletion pkg/protocol/chainmanagerv1/chainmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestChainManager(t *testing.T) {
commitment2.Index()+1,
commitment2.ID(),
commitment2.RootsID(),
2,
3,
2,
), testAPI)))

Expand All @@ -56,4 +56,5 @@ func TestChainManager(t *testing.T) {
require.True(t, commitment3Metadata.BelowSyncThreshold().Get())

require.Equal(t, iotago.SlotIndex(3), commitment3Metadata.Chain().Get().latestCommitmentIndex.Get())
require.Equal(t, uint64(3), commitment3Metadata.Chain().Get().cumulativeWeight.Get())
}
4 changes: 0 additions & 4 deletions pkg/protocol/chainmanagerv1/commitment_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,6 @@ func (c *CommitmentMetadata) registerChild(newChild *CommitmentMetadata, onSucce
c.evicted.OnTrigger(c.chainSuccessor.OnUpdate(onSuccessorUpdated))
}

// inheritChain returns a function that implements the chain inheritance rules.
//
// It must be called whenever the successor of the parent changes as we spawn a new chain for each child that is not the
// direct successor of a parent, and we inherit its chain otherwise.
func (c *CommitmentMetadata) inheritChain(parent *CommitmentMetadata) func(*CommitmentMetadata, *CommitmentMetadata) {
var spawnedChain *Chain
spawnChain := func() {
Expand Down
66 changes: 66 additions & 0 deletions pkg/protocol/chainmanagerv1/eviction.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package chainmanagerv1

import (
"github.com/iotaledger/hive.go/ds/reactive"
"github.com/iotaledger/hive.go/ds/shrinkingmap"
iotago "github.com/iotaledger/iota.go/v4"
)

type SlotEviction struct {
evictionEvents *shrinkingmap.ShrinkingMap[iotago.SlotIndex, reactive.Event]

lastEvictedSlotIndex reactive.Variable[iotago.SlotIndex]
}

func NewSlotEviction() *SlotEviction {
return &SlotEviction{
evictionEvents: shrinkingmap.New[iotago.SlotIndex, reactive.Event](),
lastEvictedSlotIndex: reactive.NewVariable[iotago.SlotIndex](),
}
}

func (c *SlotEviction) LastEvictedSlotIndex() reactive.Variable[iotago.SlotIndex] {
return c.lastEvictedSlotIndex
}

func (c *SlotEviction) EvictedEvent(index iotago.SlotIndex) reactive.Event {
slotEvictedEvent := defaultTriggeredEvent

c.lastEvictedSlotIndex.Compute(func(lastEvictedSlotIndex iotago.SlotIndex) iotago.SlotIndex {
if index > lastEvictedSlotIndex {
slotEvictedEvent, _ = c.evictionEvents.GetOrCreate(index, reactive.NewEvent)
}

return lastEvictedSlotIndex
})

return slotEvictedEvent
}

func (c *SlotEviction) Evict(slotIndex iotago.SlotIndex) {
slotEvictedEventsToTrigger := make([]reactive.Event, 0)

c.lastEvictedSlotIndex.Compute(func(lastEvictedSlotIndex iotago.SlotIndex) iotago.SlotIndex {
if slotIndex <= lastEvictedSlotIndex {
return lastEvictedSlotIndex
}

for i := lastEvictedSlotIndex + 1; i <= slotIndex; i++ {
if slotEvictedEvent, exists := c.evictionEvents.Get(i); exists {
slotEvictedEventsToTrigger = append(slotEvictedEventsToTrigger, slotEvictedEvent)
}
}

return slotIndex
})

for _, slotEvictedEvent := range slotEvictedEventsToTrigger {
slotEvictedEvent.Trigger()
}
}

var defaultTriggeredEvent = reactive.NewEvent()

func init() {
defaultTriggeredEvent.Trigger()
}

0 comments on commit 33436b4

Please sign in to comment.