Skip to content

Commit

Permalink
Merge pull request #888 from iotaledger/fix/module-lifecycle
Browse files Browse the repository at this point in the history
Fix module lifecycles
  • Loading branch information
muXxer authored Mar 28, 2024
2 parents a1b42be + a92c959 commit 96b3a2a
Show file tree
Hide file tree
Showing 21 changed files with 428 additions and 345 deletions.
10 changes: 3 additions & 7 deletions pkg/protocol/engine/blockdag/inmemoryblockdag/blockdag.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func NewProvider(opts ...options.Option[BlockDAG]) module.Provider[*engine.Engin
b := New(e.NewSubModule("BlockDAG"), e.Workers.CreateGroup("BlockDAG"), int(e.Storage.Settings().APIProvider().CommittedAPI().ProtocolParameters().MaxCommittableAge())*2, e.EvictionState, e.BlockCache, e.ErrorHandler("blockdag"), opts...)

e.ConstructedEvent().OnTrigger(func() {
b.Init(e.SyncManager.LatestCommitment)
b.latestCommitmentFunc = e.SyncManager.LatestCommitment

wp := b.workers.CreatePool("BlockDAG.Append", workerpool.WithWorkerCount(2))

Expand All @@ -59,6 +59,8 @@ func NewProvider(opts ...options.Option[BlockDAG]) module.Provider[*engine.Engin
}, event.WithWorkerPool(wp))

e.Events.BlockDAG.LinkTo(b.events)

b.InitializedEvent().Trigger()
})

return b
Expand All @@ -82,12 +84,6 @@ func New(subModule module.Module, workers *workerpool.Group, unsolidCommitmentBu
})
}

func (b *BlockDAG) Init(latestCommitmentFunc func() *model.Commitment) {
b.latestCommitmentFunc = latestCommitmentFunc

b.InitializedEvent().Trigger()
}

// Append is used to append new Blocks to the BlockDAG. It is the main function of the BlockDAG that triggers Events.
func (b *BlockDAG) Append(modelBlock *model.Block) (block *blocks.Block, wasAppended bool, err error) {
if block, wasAppended, err = b.append(modelBlock); wasAppended {
Expand Down
2 changes: 2 additions & 0 deletions pkg/protocol/engine/booker/inmemorybooker/booker.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ func NewProvider(opts ...options.Option[Booker]) module.Provider[*engine.Engine,
})

e.Events.Booker.LinkTo(b.events)

b.InitializedEvent().Trigger()
})

return b
Expand Down
98 changes: 55 additions & 43 deletions pkg/protocol/engine/clock/blocktime/clock.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,52 +35,64 @@ type Clock struct {
// NewProvider creates a new Clock provider with the given options.
func NewProvider(opts ...options.Option[Clock]) module.Provider[*engine.Engine, clock.Clock] {
return module.Provide(func(e *engine.Engine) clock.Clock {
return options.Apply(&Clock{
Module: e.NewSubModule("Clock"),
acceptedTime: NewRelativeTime(),
confirmedTime: NewRelativeTime(),
workerPool: e.Workers.CreatePool("Clock", workerpool.WithWorkerCount(1), workerpool.WithCancelPendingTasksOnShutdown(true), workerpool.WithPanicOnSubmitAfterShutdown(true)),
}, opts, func(c *Clock) {
e.ConstructedEvent().OnTrigger(func() {
latestCommitmentIndex := e.Storage.Settings().LatestCommitment().Slot()
c.acceptedTime.Set(e.APIForSlot(latestCommitmentIndex).TimeProvider().SlotEndTime(latestCommitmentIndex))

latestFinalizedSlotIndex := e.Storage.Settings().LatestFinalizedSlot()
c.confirmedTime.Set(e.APIForSlot(latestFinalizedSlotIndex).TimeProvider().SlotEndTime(latestFinalizedSlotIndex))

e.Events.Clock.AcceptedTimeUpdated.LinkTo(c.acceptedTime.OnUpdated)
e.Events.Clock.ConfirmedTimeUpdated.LinkTo(c.confirmedTime.OnUpdated)

asyncOpt := event.WithWorkerPool(c.workerPool)
c.ShutdownEvent().OnTrigger(lo.Batch(
e.Events.BlockGadget.BlockAccepted.Hook(func(block *blocks.Block) {
c.acceptedTime.Advance(block.IssuingTime())
}, asyncOpt).Unhook,

e.Events.BlockGadget.BlockConfirmed.Hook(func(block *blocks.Block) {
c.confirmedTime.Advance(block.IssuingTime())
}, asyncOpt).Unhook,

e.Events.SlotGadget.SlotFinalized.Hook(func(slot iotago.SlotIndex) {
timeProvider := e.APIForSlot(slot).TimeProvider()
slotEndTime := timeProvider.SlotEndTime(slot)

c.acceptedTime.Advance(slotEndTime)
c.confirmedTime.Advance(slotEndTime)
}, asyncOpt).Unhook,

func() {
c.workerPool.Shutdown()

c.StoppedEvent().Trigger()
},
))

c.InitializedEvent().Trigger()
c := New(e.NewSubModule("Clock"), e, opts...)

e.ConstructedEvent().OnTrigger(func() {
latestCommitmentIndex := e.Storage.Settings().LatestCommitment().Slot()
c.acceptedTime.Set(e.APIForSlot(latestCommitmentIndex).TimeProvider().SlotEndTime(latestCommitmentIndex))

latestFinalizedSlotIndex := e.Storage.Settings().LatestFinalizedSlot()
c.confirmedTime.Set(e.APIForSlot(latestFinalizedSlotIndex).TimeProvider().SlotEndTime(latestFinalizedSlotIndex))

e.Events.Clock.AcceptedTimeUpdated.LinkTo(c.acceptedTime.OnUpdated)
e.Events.Clock.ConfirmedTimeUpdated.LinkTo(c.confirmedTime.OnUpdated)

asyncOpt := event.WithWorkerPool(c.workerPool)

unhook := lo.Batch(
e.Events.BlockGadget.BlockAccepted.Hook(func(block *blocks.Block) {
c.acceptedTime.Advance(block.IssuingTime())
}, asyncOpt).Unhook,

e.Events.BlockGadget.BlockConfirmed.Hook(func(block *blocks.Block) {
c.confirmedTime.Advance(block.IssuingTime())
}, asyncOpt).Unhook,

e.Events.SlotGadget.SlotFinalized.Hook(func(slot iotago.SlotIndex) {
timeProvider := e.APIForSlot(slot).TimeProvider()
slotEndTime := timeProvider.SlotEndTime(slot)

c.acceptedTime.Advance(slotEndTime)
c.confirmedTime.Advance(slotEndTime)
}, asyncOpt).Unhook,
)

c.ShutdownEvent().OnTrigger(func() {
unhook()
c.workerPool.Shutdown()

c.StoppedEvent().Trigger()
})

c.ConstructedEvent().Trigger()
c.InitializedEvent().Trigger()
})

return c
})
}

func New(subModule module.Module, engine *engine.Engine, opts ...options.Option[Clock]) *Clock {
return options.Apply(&Clock{
Module: subModule,
acceptedTime: NewRelativeTime(),
confirmedTime: NewRelativeTime(),
workerPool: engine.Workers.CreatePool("Clock", workerpool.WithWorkerCount(1), workerpool.WithCancelPendingTasksOnShutdown(true), workerpool.WithPanicOnSubmitAfterShutdown(true)),
}, opts, func(c *Clock) {
c.ShutdownEvent().OnTrigger(func() {
c.workerPool.Shutdown()
})

c.ConstructedEvent().Trigger()
})
}

Expand Down
12 changes: 8 additions & 4 deletions pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ func NewProvider(opts ...options.Option[Scheduler]) module.Provider[*engine.Engi
return e.SyncManager.LatestCommitment().Slot()
}
s.blockCache = e.BlockCache
e.Events.Scheduler.LinkTo(s.events)
e.SybilProtection.InitializedEvent().OnTrigger(func() {
s.seatManager = e.SybilProtection.SeatManager()
})
Expand Down Expand Up @@ -103,7 +102,6 @@ func NewProvider(opts ...options.Option[Scheduler]) module.Provider[*engine.Engi
return 1 + Deficit(mana), nil
}
})
s.ConstructedEvent().Trigger()
e.Events.Booker.BlockBooked.Hook(func(block *blocks.Block) {
s.AddBlock(block)
s.selectBlockToScheduleWithLocking()
Expand All @@ -122,22 +120,28 @@ func NewProvider(opts ...options.Option[Scheduler]) module.Provider[*engine.Engi
})

e.InitializedEvent().OnTrigger(s.Start)

e.Events.Scheduler.LinkTo(s.events)

s.InitializedEvent().Trigger()
})

return s
})
}

func New(module module.Module, apiProvider iotago.APIProvider, opts ...options.Option[Scheduler]) *Scheduler {
func New(subModule module.Module, apiProvider iotago.APIProvider, opts ...options.Option[Scheduler]) *Scheduler {
return options.Apply(
&Scheduler{
Module: module,
Module: subModule,
events: scheduler.NewEvents(),
deficits: shrinkingmap.New[iotago.AccountID, Deficit](),
apiProvider: apiProvider,
validatorBuffer: NewValidatorBuffer(),
}, opts, func(s *Scheduler) {
s.ShutdownEvent().OnTrigger(s.shutdown)

s.ConstructedEvent().Trigger()
},
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package passthrough

import (
"github.com/iotaledger/hive.go/runtime/module"
"github.com/iotaledger/hive.go/runtime/options"
"github.com/iotaledger/iota-core/pkg/protocol/engine"
"github.com/iotaledger/iota-core/pkg/protocol/engine/blocks"
"github.com/iotaledger/iota-core/pkg/protocol/engine/congestioncontrol/scheduler"
Expand All @@ -19,21 +20,29 @@ func NewProvider() module.Provider[*engine.Engine, scheduler.Scheduler] {
s := New(e.NewSubModule("Scheduler"))

e.ConstructedEvent().OnTrigger(func() {
e.Events.Scheduler.LinkTo(s.events)

e.Events.Booker.BlockBooked.Hook(func(block *blocks.Block) {
s.AddBlock(block)
})

e.Events.Scheduler.LinkTo(s.events)

s.InitializedEvent().Trigger()
})

return s
})
}

func New(subModule module.Module) *Scheduler {
return module.InitSimpleLifecycle(&Scheduler{
return options.Apply(&Scheduler{
Module: subModule,
events: scheduler.NewEvents(),
}, nil, func(s *Scheduler) {
s.ShutdownEvent().OnTrigger(func() {
s.StoppedEvent().Trigger()
})

s.ConstructedEvent().Trigger()
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,21 @@ func NewProvider(opts ...options.Option[Gadget]) module.Provider[*engine.Engine,
return module.Provide(func(e *engine.Engine) blockgadget.Gadget {
g := New(e.NewSubModule("ThresholdBlockGadget"), e.BlockCache, e.SybilProtection.SeatManager(), e.ErrorHandler("gadget"), opts...)

wp := e.Workers.CreatePool("ThresholdBlockGadget", workerpool.WithWorkerCount(1))
e.Events.Booker.BlockBooked.Hook(g.TrackWitnessWeight, event.WithWorkerPool(wp))
e.ConstructedEvent().OnTrigger(func() {
wp := e.Workers.CreatePool("ThresholdBlockGadget", workerpool.WithWorkerCount(1))
e.Events.Booker.BlockBooked.Hook(g.TrackWitnessWeight, event.WithWorkerPool(wp))

e.Events.BlockGadget.LinkTo(g.events)
e.Events.BlockGadget.LinkTo(g.events)

g.InitializedEvent().Trigger()
})

return g
})
}

func New(subModule module.Module, blockCache *blocks.Blocks, seatManager seatmanager.SeatManager, errorHandler func(error), opts ...options.Option[Gadget]) *Gadget {
return module.InitSimpleLifecycle(options.Apply(&Gadget{
return options.Apply(&Gadget{
Module: subModule,
events: blockgadget.NewEvents(),
seatManager: seatManager,
Expand All @@ -56,7 +60,13 @@ func New(subModule module.Module, blockCache *blocks.Blocks, seatManager seatman
optsAcceptanceThreshold: 0.67,
optsConfirmationThreshold: 0.67,
optsConfirmationRatificationThreshold: 2,
}, opts))
}, opts, func(g *Gadget) {
g.ShutdownEvent().OnTrigger(func() {
g.StoppedEvent().Trigger()
})

g.ConstructedEvent().Trigger()
})
}

func (g *Gadget) Events() *blockgadget.Events {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,27 +37,21 @@ type Gadget struct {

func NewProvider(opts ...options.Option[Gadget]) module.Provider[*engine.Engine, slotgadget.Gadget] {
return module.Provide(func(e *engine.Engine) slotgadget.Gadget {
return options.Apply(&Gadget{
Module: e.NewSubModule("TotalWeightSlotGadget"),
events: slotgadget.NewEvents(),
slotTrackers: shrinkingmap.New[iotago.SlotIndex, *slottracker.SlotTracker](),
optsSlotFinalizationThreshold: 0.67,
errorHandler: e.ErrorHandler("slotgadget"),
}, opts, func(g *Gadget) {
e.Events.SlotGadget.LinkTo(g.events)

e.ConstructedEvent().OnTrigger(func() {
g.seatManager = e.SybilProtection.SeatManager()
g := New(e.NewSubModule("TotalWeightSlotGadget"), e, opts...)

e.Events.BlockGadget.BlockConfirmed.Hook(g.trackVotes)
})
e.ConstructedEvent().OnTrigger(func() {
g.seatManager = e.SybilProtection.SeatManager()

g.storeLastFinalizedSlotFunc = func(slot iotago.SlotIndex) {
if err := e.Storage.Settings().SetLatestFinalizedSlot(slot); err != nil {
g.errorHandler(ierrors.Wrap(err, "failed to set latest finalized slot"))
}
}

g.ConstructedEvent().Trigger()

e.Events.BlockGadget.BlockConfirmed.Hook(g.trackVotes)

e.InitializedEvent().OnTrigger(func() {
// Can't use setter here as it has a side effect.
g.mutex.Lock()
Expand All @@ -67,11 +61,25 @@ func NewProvider(opts ...options.Option[Gadget]) module.Provider[*engine.Engine,
g.InitializedEvent().Trigger()
})

g.ShutdownEvent().OnTrigger(func() {
g.StoppedEvent().Trigger()
})
e.Events.SlotGadget.LinkTo(g.events)

g.ConstructedEvent().Trigger()
g.InitializedEvent().Trigger()
})

return g
})
}

func New(subModule module.Module, engine *engine.Engine, opts ...options.Option[Gadget]) *Gadget {
return options.Apply(&Gadget{
Module: subModule,
events: slotgadget.NewEvents(),
slotTrackers: shrinkingmap.New[iotago.SlotIndex, *slottracker.SlotTracker](),
optsSlotFinalizationThreshold: 0.67,
errorHandler: engine.ErrorHandler("slotgadget"),
}, opts, func(g *Gadget) {
g.ShutdownEvent().OnTrigger(func() {
g.StoppedEvent().Trigger()
})
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,17 @@ func NewProvider(opts ...options.Option[PostSolidBlockFilter]) module.Provider[*
e.Ledger.InitializedEvent().OnTrigger(func() {
c.Init(e.Ledger.Account, e.BlockCache.Block, e.Ledger.RMCManager().RMC)
})

c.InitializedEvent().Trigger()
})

return c
})
}

func New(module module.Module, opts ...options.Option[PostSolidBlockFilter]) *PostSolidBlockFilter {
func New(subModule module.Module, opts ...options.Option[PostSolidBlockFilter]) *PostSolidBlockFilter {
return options.Apply(&PostSolidBlockFilter{
Module: module,
Module: subModule,
events: postsolidfilter.NewEvents(),
}, opts, func(p *PostSolidBlockFilter) {
p.ShutdownEvent().OnTrigger(func() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,18 @@ func NewProvider(opts ...options.Option[PreSolidBlockFilter]) module.Provider[*e
})

e.Events.PreSolidFilter.LinkTo(f.events)

f.InitializedEvent().Trigger()
})

return f
})
}

// New creates a new PreSolidBlockFilter.
func New(module module.Module, apiProvider iotago.APIProvider, opts ...options.Option[PreSolidBlockFilter]) *PreSolidBlockFilter {
func New(subModule module.Module, apiProvider iotago.APIProvider, opts ...options.Option[PreSolidBlockFilter]) *PreSolidBlockFilter {
return options.Apply(&PreSolidBlockFilter{
Module: module,
Module: subModule,
events: presolidfilter.NewEvents(),
apiProvider: apiProvider,
}, opts, func(p *PreSolidBlockFilter) {
Expand Down
Loading

0 comments on commit 96b3a2a

Please sign in to comment.