diff --git a/pkg/protocol/engine/blockdag/inmemoryblockdag/blockdag.go b/pkg/protocol/engine/blockdag/inmemoryblockdag/blockdag.go index e0ff4c34b..31202a46e 100644 --- a/pkg/protocol/engine/blockdag/inmemoryblockdag/blockdag.go +++ b/pkg/protocol/engine/blockdag/inmemoryblockdag/blockdag.go @@ -41,7 +41,7 @@ func NewProvider(opts ...options.Option[BlockDAG]) module.Provider[*engine.Engin b := New(e.NewSubModule("BlockDAG"), e.Workers.CreateGroup("BlockDAG"), int(e.Storage.Settings().APIProvider().CommittedAPI().ProtocolParameters().MaxCommittableAge())*2, e.EvictionState, e.BlockCache, e.ErrorHandler("blockdag"), opts...) e.ConstructedEvent().OnTrigger(func() { - b.Init(e.SyncManager.LatestCommitment) + b.latestCommitmentFunc = e.SyncManager.LatestCommitment wp := b.workers.CreatePool("BlockDAG.Append", workerpool.WithWorkerCount(2)) @@ -59,6 +59,8 @@ func NewProvider(opts ...options.Option[BlockDAG]) module.Provider[*engine.Engin }, event.WithWorkerPool(wp)) e.Events.BlockDAG.LinkTo(b.events) + + b.InitializedEvent().Trigger() }) return b @@ -82,12 +84,6 @@ func New(subModule module.Module, workers *workerpool.Group, unsolidCommitmentBu }) } -func (b *BlockDAG) Init(latestCommitmentFunc func() *model.Commitment) { - b.latestCommitmentFunc = latestCommitmentFunc - - b.InitializedEvent().Trigger() -} - // Append is used to append new Blocks to the BlockDAG. It is the main function of the BlockDAG that triggers Events. func (b *BlockDAG) Append(modelBlock *model.Block) (block *blocks.Block, wasAppended bool, err error) { if block, wasAppended, err = b.append(modelBlock); wasAppended { diff --git a/pkg/protocol/engine/booker/inmemorybooker/booker.go b/pkg/protocol/engine/booker/inmemorybooker/booker.go index aa7da90ca..af5e66aef 100644 --- a/pkg/protocol/engine/booker/inmemorybooker/booker.go +++ b/pkg/protocol/engine/booker/inmemorybooker/booker.go @@ -48,6 +48,8 @@ func NewProvider(opts ...options.Option[Booker]) module.Provider[*engine.Engine, }) e.Events.Booker.LinkTo(b.events) + + b.InitializedEvent().Trigger() }) return b diff --git a/pkg/protocol/engine/clock/blocktime/clock.go b/pkg/protocol/engine/clock/blocktime/clock.go index d009dcb58..0c83896b0 100644 --- a/pkg/protocol/engine/clock/blocktime/clock.go +++ b/pkg/protocol/engine/clock/blocktime/clock.go @@ -35,52 +35,64 @@ type Clock struct { // NewProvider creates a new Clock provider with the given options. func NewProvider(opts ...options.Option[Clock]) module.Provider[*engine.Engine, clock.Clock] { return module.Provide(func(e *engine.Engine) clock.Clock { - return options.Apply(&Clock{ - Module: e.NewSubModule("Clock"), - acceptedTime: NewRelativeTime(), - confirmedTime: NewRelativeTime(), - workerPool: e.Workers.CreatePool("Clock", workerpool.WithWorkerCount(1), workerpool.WithCancelPendingTasksOnShutdown(true), workerpool.WithPanicOnSubmitAfterShutdown(true)), - }, opts, func(c *Clock) { - e.ConstructedEvent().OnTrigger(func() { - latestCommitmentIndex := e.Storage.Settings().LatestCommitment().Slot() - c.acceptedTime.Set(e.APIForSlot(latestCommitmentIndex).TimeProvider().SlotEndTime(latestCommitmentIndex)) - - latestFinalizedSlotIndex := e.Storage.Settings().LatestFinalizedSlot() - c.confirmedTime.Set(e.APIForSlot(latestFinalizedSlotIndex).TimeProvider().SlotEndTime(latestFinalizedSlotIndex)) - - e.Events.Clock.AcceptedTimeUpdated.LinkTo(c.acceptedTime.OnUpdated) - e.Events.Clock.ConfirmedTimeUpdated.LinkTo(c.confirmedTime.OnUpdated) - - asyncOpt := event.WithWorkerPool(c.workerPool) - c.ShutdownEvent().OnTrigger(lo.Batch( - e.Events.BlockGadget.BlockAccepted.Hook(func(block *blocks.Block) { - c.acceptedTime.Advance(block.IssuingTime()) - }, asyncOpt).Unhook, - - e.Events.BlockGadget.BlockConfirmed.Hook(func(block *blocks.Block) { - c.confirmedTime.Advance(block.IssuingTime()) - }, asyncOpt).Unhook, - - e.Events.SlotGadget.SlotFinalized.Hook(func(slot iotago.SlotIndex) { - timeProvider := e.APIForSlot(slot).TimeProvider() - slotEndTime := timeProvider.SlotEndTime(slot) - - c.acceptedTime.Advance(slotEndTime) - c.confirmedTime.Advance(slotEndTime) - }, asyncOpt).Unhook, - - func() { - c.workerPool.Shutdown() - - c.StoppedEvent().Trigger() - }, - )) - - c.InitializedEvent().Trigger() + c := New(e.NewSubModule("Clock"), e, opts...) + + e.ConstructedEvent().OnTrigger(func() { + latestCommitmentIndex := e.Storage.Settings().LatestCommitment().Slot() + c.acceptedTime.Set(e.APIForSlot(latestCommitmentIndex).TimeProvider().SlotEndTime(latestCommitmentIndex)) + + latestFinalizedSlotIndex := e.Storage.Settings().LatestFinalizedSlot() + c.confirmedTime.Set(e.APIForSlot(latestFinalizedSlotIndex).TimeProvider().SlotEndTime(latestFinalizedSlotIndex)) + + e.Events.Clock.AcceptedTimeUpdated.LinkTo(c.acceptedTime.OnUpdated) + e.Events.Clock.ConfirmedTimeUpdated.LinkTo(c.confirmedTime.OnUpdated) + + asyncOpt := event.WithWorkerPool(c.workerPool) + + unhook := lo.Batch( + e.Events.BlockGadget.BlockAccepted.Hook(func(block *blocks.Block) { + c.acceptedTime.Advance(block.IssuingTime()) + }, asyncOpt).Unhook, + + e.Events.BlockGadget.BlockConfirmed.Hook(func(block *blocks.Block) { + c.confirmedTime.Advance(block.IssuingTime()) + }, asyncOpt).Unhook, + + e.Events.SlotGadget.SlotFinalized.Hook(func(slot iotago.SlotIndex) { + timeProvider := e.APIForSlot(slot).TimeProvider() + slotEndTime := timeProvider.SlotEndTime(slot) + + c.acceptedTime.Advance(slotEndTime) + c.confirmedTime.Advance(slotEndTime) + }, asyncOpt).Unhook, + ) + + c.ShutdownEvent().OnTrigger(func() { + unhook() + c.workerPool.Shutdown() + + c.StoppedEvent().Trigger() }) - c.ConstructedEvent().Trigger() + c.InitializedEvent().Trigger() + }) + + return c + }) +} + +func New(subModule module.Module, engine *engine.Engine, opts ...options.Option[Clock]) *Clock { + return options.Apply(&Clock{ + Module: subModule, + acceptedTime: NewRelativeTime(), + confirmedTime: NewRelativeTime(), + workerPool: engine.Workers.CreatePool("Clock", workerpool.WithWorkerCount(1), workerpool.WithCancelPendingTasksOnShutdown(true), workerpool.WithPanicOnSubmitAfterShutdown(true)), + }, opts, func(c *Clock) { + c.ShutdownEvent().OnTrigger(func() { + c.workerPool.Shutdown() }) + + c.ConstructedEvent().Trigger() }) } diff --git a/pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go b/pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go index bc37cdfb4..445a6a7f3 100644 --- a/pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go +++ b/pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go @@ -62,7 +62,6 @@ func NewProvider(opts ...options.Option[Scheduler]) module.Provider[*engine.Engi return e.SyncManager.LatestCommitment().Slot() } s.blockCache = e.BlockCache - e.Events.Scheduler.LinkTo(s.events) e.SybilProtection.InitializedEvent().OnTrigger(func() { s.seatManager = e.SybilProtection.SeatManager() }) @@ -103,7 +102,6 @@ func NewProvider(opts ...options.Option[Scheduler]) module.Provider[*engine.Engi return 1 + Deficit(mana), nil } }) - s.ConstructedEvent().Trigger() e.Events.Booker.BlockBooked.Hook(func(block *blocks.Block) { s.AddBlock(block) s.selectBlockToScheduleWithLocking() @@ -122,22 +120,28 @@ func NewProvider(opts ...options.Option[Scheduler]) module.Provider[*engine.Engi }) e.InitializedEvent().OnTrigger(s.Start) + + e.Events.Scheduler.LinkTo(s.events) + + s.InitializedEvent().Trigger() }) return s }) } -func New(module module.Module, apiProvider iotago.APIProvider, opts ...options.Option[Scheduler]) *Scheduler { +func New(subModule module.Module, apiProvider iotago.APIProvider, opts ...options.Option[Scheduler]) *Scheduler { return options.Apply( &Scheduler{ - Module: module, + Module: subModule, events: scheduler.NewEvents(), deficits: shrinkingmap.New[iotago.AccountID, Deficit](), apiProvider: apiProvider, validatorBuffer: NewValidatorBuffer(), }, opts, func(s *Scheduler) { s.ShutdownEvent().OnTrigger(s.shutdown) + + s.ConstructedEvent().Trigger() }, ) } diff --git a/pkg/protocol/engine/congestioncontrol/scheduler/passthrough/scheduler.go b/pkg/protocol/engine/congestioncontrol/scheduler/passthrough/scheduler.go index 44f06bca3..15193b2fe 100644 --- a/pkg/protocol/engine/congestioncontrol/scheduler/passthrough/scheduler.go +++ b/pkg/protocol/engine/congestioncontrol/scheduler/passthrough/scheduler.go @@ -2,6 +2,7 @@ package passthrough import ( "github.com/iotaledger/hive.go/runtime/module" + "github.com/iotaledger/hive.go/runtime/options" "github.com/iotaledger/iota-core/pkg/protocol/engine" "github.com/iotaledger/iota-core/pkg/protocol/engine/blocks" "github.com/iotaledger/iota-core/pkg/protocol/engine/congestioncontrol/scheduler" @@ -19,11 +20,13 @@ func NewProvider() module.Provider[*engine.Engine, scheduler.Scheduler] { s := New(e.NewSubModule("Scheduler")) e.ConstructedEvent().OnTrigger(func() { - e.Events.Scheduler.LinkTo(s.events) - e.Events.Booker.BlockBooked.Hook(func(block *blocks.Block) { s.AddBlock(block) }) + + e.Events.Scheduler.LinkTo(s.events) + + s.InitializedEvent().Trigger() }) return s @@ -31,9 +34,15 @@ func NewProvider() module.Provider[*engine.Engine, scheduler.Scheduler] { } func New(subModule module.Module) *Scheduler { - return module.InitSimpleLifecycle(&Scheduler{ + return options.Apply(&Scheduler{ Module: subModule, events: scheduler.NewEvents(), + }, nil, func(s *Scheduler) { + s.ShutdownEvent().OnTrigger(func() { + s.StoppedEvent().Trigger() + }) + + s.ConstructedEvent().Trigger() }) } diff --git a/pkg/protocol/engine/consensus/blockgadget/thresholdblockgadget/gadget.go b/pkg/protocol/engine/consensus/blockgadget/thresholdblockgadget/gadget.go index 866e3e3bd..493ff97b5 100644 --- a/pkg/protocol/engine/consensus/blockgadget/thresholdblockgadget/gadget.go +++ b/pkg/protocol/engine/consensus/blockgadget/thresholdblockgadget/gadget.go @@ -36,17 +36,21 @@ func NewProvider(opts ...options.Option[Gadget]) module.Provider[*engine.Engine, return module.Provide(func(e *engine.Engine) blockgadget.Gadget { g := New(e.NewSubModule("ThresholdBlockGadget"), e.BlockCache, e.SybilProtection.SeatManager(), e.ErrorHandler("gadget"), opts...) - wp := e.Workers.CreatePool("ThresholdBlockGadget", workerpool.WithWorkerCount(1)) - e.Events.Booker.BlockBooked.Hook(g.TrackWitnessWeight, event.WithWorkerPool(wp)) + e.ConstructedEvent().OnTrigger(func() { + wp := e.Workers.CreatePool("ThresholdBlockGadget", workerpool.WithWorkerCount(1)) + e.Events.Booker.BlockBooked.Hook(g.TrackWitnessWeight, event.WithWorkerPool(wp)) - e.Events.BlockGadget.LinkTo(g.events) + e.Events.BlockGadget.LinkTo(g.events) + + g.InitializedEvent().Trigger() + }) return g }) } func New(subModule module.Module, blockCache *blocks.Blocks, seatManager seatmanager.SeatManager, errorHandler func(error), opts ...options.Option[Gadget]) *Gadget { - return module.InitSimpleLifecycle(options.Apply(&Gadget{ + return options.Apply(&Gadget{ Module: subModule, events: blockgadget.NewEvents(), seatManager: seatManager, @@ -56,7 +60,13 @@ func New(subModule module.Module, blockCache *blocks.Blocks, seatManager seatman optsAcceptanceThreshold: 0.67, optsConfirmationThreshold: 0.67, optsConfirmationRatificationThreshold: 2, - }, opts)) + }, opts, func(g *Gadget) { + g.ShutdownEvent().OnTrigger(func() { + g.StoppedEvent().Trigger() + }) + + g.ConstructedEvent().Trigger() + }) } func (g *Gadget) Events() *blockgadget.Events { diff --git a/pkg/protocol/engine/consensus/slotgadget/totalweightslotgadget/gadget.go b/pkg/protocol/engine/consensus/slotgadget/totalweightslotgadget/gadget.go index 8af394564..84dd24ec8 100644 --- a/pkg/protocol/engine/consensus/slotgadget/totalweightslotgadget/gadget.go +++ b/pkg/protocol/engine/consensus/slotgadget/totalweightslotgadget/gadget.go @@ -37,20 +37,10 @@ type Gadget struct { func NewProvider(opts ...options.Option[Gadget]) module.Provider[*engine.Engine, slotgadget.Gadget] { return module.Provide(func(e *engine.Engine) slotgadget.Gadget { - return options.Apply(&Gadget{ - Module: e.NewSubModule("TotalWeightSlotGadget"), - events: slotgadget.NewEvents(), - slotTrackers: shrinkingmap.New[iotago.SlotIndex, *slottracker.SlotTracker](), - optsSlotFinalizationThreshold: 0.67, - errorHandler: e.ErrorHandler("slotgadget"), - }, opts, func(g *Gadget) { - e.Events.SlotGadget.LinkTo(g.events) - - e.ConstructedEvent().OnTrigger(func() { - g.seatManager = e.SybilProtection.SeatManager() + g := New(e.NewSubModule("TotalWeightSlotGadget"), e, opts...) - e.Events.BlockGadget.BlockConfirmed.Hook(g.trackVotes) - }) + e.ConstructedEvent().OnTrigger(func() { + g.seatManager = e.SybilProtection.SeatManager() g.storeLastFinalizedSlotFunc = func(slot iotago.SlotIndex) { if err := e.Storage.Settings().SetLatestFinalizedSlot(slot); err != nil { @@ -58,6 +48,10 @@ func NewProvider(opts ...options.Option[Gadget]) module.Provider[*engine.Engine, } } + g.ConstructedEvent().Trigger() + + e.Events.BlockGadget.BlockConfirmed.Hook(g.trackVotes) + e.InitializedEvent().OnTrigger(func() { // Can't use setter here as it has a side effect. g.mutex.Lock() @@ -67,11 +61,25 @@ func NewProvider(opts ...options.Option[Gadget]) module.Provider[*engine.Engine, g.InitializedEvent().Trigger() }) - g.ShutdownEvent().OnTrigger(func() { - g.StoppedEvent().Trigger() - }) + e.Events.SlotGadget.LinkTo(g.events) - g.ConstructedEvent().Trigger() + g.InitializedEvent().Trigger() + }) + + return g + }) +} + +func New(subModule module.Module, engine *engine.Engine, opts ...options.Option[Gadget]) *Gadget { + return options.Apply(&Gadget{ + Module: subModule, + events: slotgadget.NewEvents(), + slotTrackers: shrinkingmap.New[iotago.SlotIndex, *slottracker.SlotTracker](), + optsSlotFinalizationThreshold: 0.67, + errorHandler: engine.ErrorHandler("slotgadget"), + }, opts, func(g *Gadget) { + g.ShutdownEvent().OnTrigger(func() { + g.StoppedEvent().Trigger() }) }) } diff --git a/pkg/protocol/engine/filter/postsolidfilter/postsolidblockfilter/post_solid_block_filter.go b/pkg/protocol/engine/filter/postsolidfilter/postsolidblockfilter/post_solid_block_filter.go index 66ef9acdf..dc2a04252 100644 --- a/pkg/protocol/engine/filter/postsolidfilter/postsolidblockfilter/post_solid_block_filter.go +++ b/pkg/protocol/engine/filter/postsolidfilter/postsolidblockfilter/post_solid_block_filter.go @@ -38,15 +38,17 @@ func NewProvider(opts ...options.Option[PostSolidBlockFilter]) module.Provider[* e.Ledger.InitializedEvent().OnTrigger(func() { c.Init(e.Ledger.Account, e.BlockCache.Block, e.Ledger.RMCManager().RMC) }) + + c.InitializedEvent().Trigger() }) return c }) } -func New(module module.Module, opts ...options.Option[PostSolidBlockFilter]) *PostSolidBlockFilter { +func New(subModule module.Module, opts ...options.Option[PostSolidBlockFilter]) *PostSolidBlockFilter { return options.Apply(&PostSolidBlockFilter{ - Module: module, + Module: subModule, events: postsolidfilter.NewEvents(), }, opts, func(p *PostSolidBlockFilter) { p.ShutdownEvent().OnTrigger(func() { diff --git a/pkg/protocol/engine/filter/presolidfilter/presolidblockfilter/pre_solid_block_filter.go b/pkg/protocol/engine/filter/presolidfilter/presolidblockfilter/pre_solid_block_filter.go index 8f79bea0d..0e95d09b2 100644 --- a/pkg/protocol/engine/filter/presolidfilter/presolidblockfilter/pre_solid_block_filter.go +++ b/pkg/protocol/engine/filter/presolidfilter/presolidblockfilter/pre_solid_block_filter.go @@ -39,6 +39,8 @@ func NewProvider(opts ...options.Option[PreSolidBlockFilter]) module.Provider[*e }) e.Events.PreSolidFilter.LinkTo(f.events) + + f.InitializedEvent().Trigger() }) return f @@ -46,9 +48,9 @@ func NewProvider(opts ...options.Option[PreSolidBlockFilter]) module.Provider[*e } // New creates a new PreSolidBlockFilter. -func New(module module.Module, apiProvider iotago.APIProvider, opts ...options.Option[PreSolidBlockFilter]) *PreSolidBlockFilter { +func New(subModule module.Module, apiProvider iotago.APIProvider, opts ...options.Option[PreSolidBlockFilter]) *PreSolidBlockFilter { return options.Apply(&PreSolidBlockFilter{ - Module: module, + Module: subModule, events: presolidfilter.NewEvents(), apiProvider: apiProvider, }, opts, func(p *PreSolidBlockFilter) { diff --git a/pkg/protocol/engine/notarization/slotnotarization/manager.go b/pkg/protocol/engine/notarization/slotnotarization/manager.go index 94c5039c8..d14e14167 100644 --- a/pkg/protocol/engine/notarization/slotnotarization/manager.go +++ b/pkg/protocol/engine/notarization/slotnotarization/manager.go @@ -6,6 +6,7 @@ import ( "github.com/iotaledger/hive.go/ierrors" "github.com/iotaledger/hive.go/runtime/event" "github.com/iotaledger/hive.go/runtime/module" + "github.com/iotaledger/hive.go/runtime/options" "github.com/iotaledger/hive.go/runtime/syncutils" "github.com/iotaledger/hive.go/runtime/workerpool" "github.com/iotaledger/hive.go/serializer/v2/serix" @@ -49,17 +50,11 @@ type Manager struct { func NewProvider() module.Provider[*engine.Engine, notarization.Notarization] { return module.Provide(func(e *engine.Engine) notarization.Notarization { - logger := e.NewChildLogger("NotarizationManager") - - m := NewManager(e.NewSubModule("NotarizationManager"), e.Workers.CreateGroup("NotarizationManager"), e.ErrorHandler("notarization")) - m.ShutdownEvent().OnTrigger(logger.Shutdown) - - m.apiProvider = e + m := NewManager(e.NewSubModule("NotarizationManager"), e) e.ConstructedEvent().OnTrigger(func() { m.storage = e.Storage m.acceptedTimeFunc = e.Clock.Accepted().Time - m.ledger = e.Ledger m.sybilProtection = e.SybilProtection m.tipSelection = e.TipSelection @@ -80,6 +75,7 @@ func NewProvider() module.Provider[*engine.Engine, notarization.Notarization] { e.Events.Notarization.LinkTo(m.events) m.slotMutations = NewSlotMutations(e.Storage.Settings().LatestCommitment().Slot()) + m.InitializedEvent().Trigger() }) @@ -87,19 +83,18 @@ func NewProvider() module.Provider[*engine.Engine, notarization.Notarization] { }) } -func NewManager(subModule module.Module, workers *workerpool.Group, errorHandler func(error)) *Manager { - m := &Manager{ +func NewManager(subModule module.Module, engine *engine.Engine) *Manager { + return options.Apply(&Manager{ Module: subModule, events: notarization.NewEvents(), - workers: workers, - errorHandler: errorHandler, - } - - m.ShutdownEvent().OnTrigger(m.Shutdown) + workers: engine.Workers.CreateGroup("NotarizationManager"), + errorHandler: engine.ErrorHandler("notarization"), + apiProvider: engine, + }, nil, func(m *Manager) { + m.ShutdownEvent().OnTrigger(m.Shutdown) - m.ConstructedEvent().Trigger() - - return m + m.ConstructedEvent().Trigger() + }) } func (m *Manager) Shutdown() { diff --git a/pkg/protocol/engine/syncmanager/trivialsyncmanager/syncmanager.go b/pkg/protocol/engine/syncmanager/trivialsyncmanager/syncmanager.go index d0ccdb7c1..e6177b773 100644 --- a/pkg/protocol/engine/syncmanager/trivialsyncmanager/syncmanager.go +++ b/pkg/protocol/engine/syncmanager/trivialsyncmanager/syncmanager.go @@ -84,45 +84,49 @@ func NewProvider(opts ...options.Option[SyncManager]) module.Provider[*engine.En s := New(e.NewSubModule("SyncManager"), e, e.Storage.Settings().LatestCommitment(), e.Storage.Settings().LatestFinalizedSlot(), opts...) asyncOpt := event.WithWorkerPool(e.Workers.CreatePool("SyncManager", workerpool.WithWorkerCount(1))) - e.Events.BlockGadget.BlockAccepted.Hook(func(b *blocks.Block) { - if s.updateLastAcceptedBlock(b.ID()) { - s.triggerUpdate() - } - }, asyncOpt) - - e.Events.BlockGadget.BlockConfirmed.Hook(func(b *blocks.Block) { - if s.updateLastConfirmedBlock(b.ID()) { - s.triggerUpdate() - } - }, asyncOpt) - - e.Events.Notarization.LatestCommitmentUpdated.Hook(func(commitment *model.Commitment) { - var bootstrapChanged bool - if !s.IsBootstrapped() { - bootstrapChanged = s.updateBootstrappedStatus() - } - - syncChanged := s.updateSyncStatus() - commitmentChanged := s.updateLatestCommitment(commitment) - - if bootstrapChanged || syncChanged || commitmentChanged { - s.triggerUpdate() - } - }, asyncOpt) - - e.Events.SlotGadget.SlotFinalized.Hook(func(slot iotago.SlotIndex) { - if s.updateFinalizedSlot(slot) { - s.triggerUpdate() - } - }, asyncOpt) - - e.Storage.Pruned.Hook(func(epoch iotago.EpochIndex) { - if s.updatePrunedEpoch(epoch, true) { - s.triggerUpdate() - } - }, asyncOpt) - - e.Events.SyncManager.LinkTo(s.events) + e.ConstructedEvent().OnTrigger(func() { + e.Events.BlockGadget.BlockAccepted.Hook(func(b *blocks.Block) { + if s.updateLastAcceptedBlock(b.ID()) { + s.triggerUpdate() + } + }, asyncOpt) + + e.Events.BlockGadget.BlockConfirmed.Hook(func(b *blocks.Block) { + if s.updateLastConfirmedBlock(b.ID()) { + s.triggerUpdate() + } + }, asyncOpt) + + e.Events.Notarization.LatestCommitmentUpdated.Hook(func(commitment *model.Commitment) { + var bootstrapChanged bool + if !s.IsBootstrapped() { + bootstrapChanged = s.updateBootstrappedStatus() + } + + syncChanged := s.updateSyncStatus() + commitmentChanged := s.updateLatestCommitment(commitment) + + if bootstrapChanged || syncChanged || commitmentChanged { + s.triggerUpdate() + } + }, asyncOpt) + + e.Events.SlotGadget.SlotFinalized.Hook(func(slot iotago.SlotIndex) { + if s.updateFinalizedSlot(slot) { + s.triggerUpdate() + } + }, asyncOpt) + + e.Storage.Pruned.Hook(func(epoch iotago.EpochIndex) { + if s.updatePrunedEpoch(epoch, true) { + s.triggerUpdate() + } + }, asyncOpt) + + e.Events.SyncManager.LinkTo(s.events) + + s.InitializedEvent().Trigger() + }) return s }) @@ -131,7 +135,7 @@ func NewProvider(opts ...options.Option[SyncManager]) module.Provider[*engine.En func New(subModule module.Module, e *engine.Engine, latestCommitment *model.Commitment, finalizedSlot iotago.SlotIndex, opts ...options.Option[SyncManager]) *SyncManager { ctxUpdateSyncStatusTicker, ctxCancelUpdateSyncStatusTicker := context.WithCancel(context.Background()) - return module.InitSimpleLifecycle(options.Apply(&SyncManager{ + return options.Apply(&SyncManager{ Module: subModule, events: syncmanager.NewEvents(), engine: e, @@ -164,14 +168,18 @@ func New(subModule module.Module, e *engine.Engine, latestCommitment *model.Comm return time.Since(e.Clock.Accepted().RelativeTime()) < s.optsBootstrappedThreshold && e.Notarization.IsBootstrapped() } } - }), func(syncManager *SyncManager) { - // stop the ticker when the engine is shutting down - ctxCancelUpdateSyncStatusTicker() - // wait for the ticker to gracefully shut down - syncManager.isSyncedTicker.WaitForGracefulShutdown() + s.ShutdownEvent().OnTrigger(func() { + // stop the ticker when the engine is shutting down + ctxCancelUpdateSyncStatusTicker() + + // wait for the ticker to gracefully shut down + s.isSyncedTicker.WaitForGracefulShutdown() + + s.StoppedEvent().Trigger() + }) - syncManager.Module.StoppedEvent().Trigger() + s.ConstructedEvent().Trigger() }) } diff --git a/pkg/protocol/engine/tipmanager/v1/provider.go b/pkg/protocol/engine/tipmanager/v1/provider.go index 8debf6311..797c54911 100644 --- a/pkg/protocol/engine/tipmanager/v1/provider.go +++ b/pkg/protocol/engine/tipmanager/v1/provider.go @@ -29,10 +29,8 @@ func NewProvider() module.Provider[*engine.Engine, tipmanager.TipManager] { e.Events.SeatManager.OnlineCommitteeSeatRemoved.Hook(t.RemoveSeat) e.Events.TipManager.BlockAdded.LinkTo(t.blockAdded) - }) - e.ShutdownEvent().OnTrigger(func() { - t.ShutdownEvent().Trigger() + t.InitializedEvent().Trigger() }) return t diff --git a/pkg/protocol/engine/tipmanager/v1/tip_manager.go b/pkg/protocol/engine/tipmanager/v1/tip_manager.go index 4ac94a8fb..814d3e788 100644 --- a/pkg/protocol/engine/tipmanager/v1/tip_manager.go +++ b/pkg/protocol/engine/tipmanager/v1/tip_manager.go @@ -8,6 +8,7 @@ import ( "github.com/iotaledger/hive.go/log" "github.com/iotaledger/hive.go/runtime/event" "github.com/iotaledger/hive.go/runtime/module" + "github.com/iotaledger/hive.go/runtime/options" "github.com/iotaledger/hive.go/runtime/syncutils" "github.com/iotaledger/iota-core/pkg/core/account" "github.com/iotaledger/iota-core/pkg/protocol/engine/blocks" @@ -57,7 +58,7 @@ func New( blockRetriever func(blockID iotago.BlockID) (block *blocks.Block, exists bool), retrieveCommitteeInSlot func(slot iotago.SlotIndex) (*account.SeatedAccounts, bool), ) *TipManager { - t := &TipManager{ + return options.Apply(&TipManager{ Module: subModule, retrieveBlock: blockRetriever, retrieveCommitteeInSlot: retrieveCommitteeInSlot, @@ -67,11 +68,15 @@ func New( strongTipSet: randommap.New[iotago.BlockID, *TipMetadata](), weakTipSet: randommap.New[iotago.BlockID, *TipMetadata](), blockAdded: event.New1[tipmanager.TipMetadata](), - } + }, nil, func(t *TipManager) { + t.initLogging() - t.initLogging() + t.ShutdownEvent().OnTrigger(func() { + t.StoppedEvent().Trigger() + }) - return module.InitSimpleLifecycle(t) + t.ConstructedEvent().Trigger() + }) } // AddBlock adds a Block to the TipManager and returns the TipMetadata if the Block was added successfully. diff --git a/pkg/protocol/engine/tipselection/v1/provider.go b/pkg/protocol/engine/tipselection/v1/provider.go index 336fe18f7..884db1994 100644 --- a/pkg/protocol/engine/tipselection/v1/provider.go +++ b/pkg/protocol/engine/tipselection/v1/provider.go @@ -25,10 +25,8 @@ func NewProvider(opts ...options.Option[TipSelection]) module.Provider[*engine.E return e.SybilProtection.SeatManager().OnlineCommittee().Size() })) }) - }) - e.ShutdownEvent().OnTrigger(func() { - t.ShutdownEvent().Trigger() + t.InitializedEvent().Trigger() }) return t diff --git a/pkg/protocol/engine/tipselection/v1/tip_selection.go b/pkg/protocol/engine/tipselection/v1/tip_selection.go index d693d779b..2b41bf709 100644 --- a/pkg/protocol/engine/tipselection/v1/tip_selection.go +++ b/pkg/protocol/engine/tipselection/v1/tip_selection.go @@ -68,16 +68,22 @@ type TipSelection struct { } // New is the constructor for the TipSelection. -func New(module module.Module, opts ...options.Option[TipSelection]) *TipSelection { +func New(subModule module.Module, opts ...options.Option[TipSelection]) *TipSelection { return options.Apply(&TipSelection{ - Module: module, + Module: subModule, livenessThresholdQueue: timed.NewPriorityQueue[tipmanager.TipMetadata](true), acceptanceTime: reactive.NewVariable[time.Time](monotonicallyIncreasing), optMaxStrongParents: 8, optMaxLikedInsteadReferences: 8, optMaxLikedInsteadReferencesPerParent: 4, optMaxWeakReferences: 8, - }, opts) + }, opts, func(t *TipSelection) { + t.ShutdownEvent().OnTrigger(func() { + t.StoppedEvent().Trigger() + }) + + t.ConstructedEvent().Trigger() + }) } // Construct fills in the dependencies of the TipSelection and triggers the constructed and initialized events of the diff --git a/pkg/protocol/engine/upgrade/signalingupgradeorchestrator/orchestrator.go b/pkg/protocol/engine/upgrade/signalingupgradeorchestrator/orchestrator.go index e67c27168..300aec42a 100644 --- a/pkg/protocol/engine/upgrade/signalingupgradeorchestrator/orchestrator.go +++ b/pkg/protocol/engine/upgrade/signalingupgradeorchestrator/orchestrator.go @@ -87,30 +87,33 @@ func NewProvider(opts ...options.Option[Orchestrator]) module.Provider[*engine.E opts..., ) - for _, protocolParams := range o.optsProtocolParameters { - storedProtocolParams := e.Storage.Settings().APIProvider().ProtocolParameters(protocolParams.Version()) - if storedProtocolParams != nil { - if lo.PanicOnErr(storedProtocolParams.Hash()) != lo.PanicOnErr(protocolParams.Hash()) { - panic(ierrors.Errorf("protocol parameters for version %d already exist with different hash", protocolParams.Version())) + e.ConstructedEvent().OnTrigger(func() { + for _, protocolParams := range o.optsProtocolParameters { + storedProtocolParams := e.Storage.Settings().APIProvider().ProtocolParameters(protocolParams.Version()) + if storedProtocolParams != nil { + if lo.PanicOnErr(storedProtocolParams.Hash()) != lo.PanicOnErr(protocolParams.Hash()) { + panic(ierrors.Errorf("protocol parameters for version %d already exist with different hash", protocolParams.Version())) + } + + if !storedProtocolParams.Equals(protocolParams) { + panic(ierrors.Errorf("protocol parameters for version %d already exist but are not equal", protocolParams.Version())) + } } - if !storedProtocolParams.Equals(protocolParams) { - panic(ierrors.Errorf("protocol parameters for version %d already exist but are not equal", protocolParams.Version())) + if err := e.Storage.Settings().StoreProtocolParameters(protocolParams); err != nil { + panic(ierrors.Wrapf(err, "failed to store protocol parameters for version %d", protocolParams.Version())) } } - if err := e.Storage.Settings().StoreProtocolParameters(protocolParams); err != nil { - panic(ierrors.Wrapf(err, "failed to store protocol parameters for version %d", protocolParams.Version())) - } - } - - o.InitializedEvent().Trigger() + o.InitializedEvent().Trigger() + }) return o }) } -func NewOrchestrator(module module.Module, errorHandler func(error), +func NewOrchestrator(subModule module.Module, + errorHandler func(error), decidedUpgradeSignals epochstore.Store[model.VersionAndHash], upgradeSignalsFunc func(slot iotago.SlotIndex) (*slotstore.Store[account.SeatIndex, *model.SignaledBlock], error), apiProvider iotago.APIProvider, @@ -119,7 +122,7 @@ func NewOrchestrator(module module.Module, errorHandler func(error), epochForVersionFunc func(iotago.Version) (iotago.EpochIndex, bool), seatManager seatmanager.SeatManager, opts ...options.Option[Orchestrator]) *Orchestrator { return options.Apply(&Orchestrator{ - Module: module, + Module: subModule, errorHandler: errorHandler, latestSignals: memstorage.NewIndexedStorage[iotago.SlotIndex, account.SeatIndex, *model.SignaledBlock](), decidedUpgradeSignals: decidedUpgradeSignals, diff --git a/pkg/protocol/sybilprotection/seatmanager/poa/poa.go b/pkg/protocol/sybilprotection/seatmanager/poa/poa.go index 531a4078c..f2d8e976b 100644 --- a/pkg/protocol/sybilprotection/seatmanager/poa/poa.go +++ b/pkg/protocol/sybilprotection/seatmanager/poa/poa.go @@ -38,47 +38,52 @@ type SeatManager struct { // NewProvider returns a new sybil protection provider that uses the ProofOfAuthority module. func NewProvider(opts ...options.Option[SeatManager]) module.Provider[*engine.Engine, seatmanager.SeatManager] { return module.Provide(func(e *engine.Engine) seatmanager.SeatManager { - return options.Apply( - &SeatManager{ - Module: e.NewSubModule("ProofOfAuthoritySeatManager"), - events: seatmanager.NewEvents(), - apiProvider: e, - committeeStore: e.Storage.Committee(), - }, opts, func(s *SeatManager) { - activityTracker := activitytrackerv1.NewActivityTracker(e) - s.activityTracker = activityTracker - s.events.OnlineCommitteeSeatAdded.LinkTo(activityTracker.Events.OnlineCommitteeSeatAdded) - s.events.OnlineCommitteeSeatRemoved.LinkTo(activityTracker.Events.OnlineCommitteeSeatRemoved) - - e.Events.SeatManager.LinkTo(s.events) - - e.ConstructedEvent().OnTrigger(func() { - // We need to mark validators as active upon solidity of blocks as otherwise we would not be able to - // recover if no node was part of the online committee anymore. - e.Events.PostSolidFilter.BlockAllowed.Hook(func(block *blocks.Block) { - // Only track identities that are part of the committee. - committee, exists := s.CommitteeInSlot(block.ID().Slot()) - if !exists { - panic(ierrors.Errorf("committee not selected for slot %d, but received block in that slot", block.ID().Slot())) - } - - seat, exists := committee.GetSeat(block.ProtocolBlock().Header.IssuerID) - if exists { - s.activityTracker.MarkSeatActive(seat, block.ProtocolBlock().Header.IssuerID, block.IssuingTime()) - } - - s.events.BlockProcessed.Trigger(block) - }) - - s.InitializedEvent().Trigger() - }) - - s.ShutdownEvent().OnTrigger(func() { - s.StoppedEvent().Trigger() - }) - - s.ConstructedEvent().Trigger() + s := New(e.NewSubModule("ProofOfAuthoritySeatManager"), e, opts...) + + e.ConstructedEvent().OnTrigger(func() { + // We need to mark validators as active upon solidity of blocks as otherwise we would not be able to + // recover if no node was part of the online committee anymore. + e.Events.PostSolidFilter.BlockAllowed.Hook(func(block *blocks.Block) { + // Only track identities that are part of the committee. + committee, exists := s.CommitteeInSlot(block.ID().Slot()) + if !exists { + panic(ierrors.Errorf("committee not selected for slot %d, but received block in that slot", block.ID().Slot())) + } + + seat, exists := committee.GetSeat(block.ProtocolBlock().Header.IssuerID) + if exists { + s.activityTracker.MarkSeatActive(seat, block.ProtocolBlock().Header.IssuerID, block.IssuingTime()) + } + + s.events.BlockProcessed.Trigger(block) }) + + e.Events.SeatManager.LinkTo(s.events) + + s.InitializedEvent().Trigger() + }) + + return s + }) +} + +func New(subModule module.Module, engine *engine.Engine, opts ...options.Option[SeatManager]) *SeatManager { + return options.Apply(&SeatManager{ + Module: subModule, + events: seatmanager.NewEvents(), + apiProvider: engine, + committeeStore: engine.Storage.Committee(), + }, opts, func(s *SeatManager) { + activityTracker := activitytrackerv1.NewActivityTracker(engine) + s.activityTracker = activityTracker + s.events.OnlineCommitteeSeatAdded.LinkTo(activityTracker.Events.OnlineCommitteeSeatAdded) + s.events.OnlineCommitteeSeatRemoved.LinkTo(activityTracker.Events.OnlineCommitteeSeatRemoved) + + s.ShutdownEvent().OnTrigger(func() { + s.StoppedEvent().Trigger() + }) + + s.ConstructedEvent().Trigger() }) } diff --git a/pkg/protocol/sybilprotection/seatmanager/topstakers/topstakers.go b/pkg/protocol/sybilprotection/seatmanager/topstakers/topstakers.go index 3729c8b27..27f8304a9 100644 --- a/pkg/protocol/sybilprotection/seatmanager/topstakers/topstakers.go +++ b/pkg/protocol/sybilprotection/seatmanager/topstakers/topstakers.go @@ -39,47 +39,52 @@ type SeatManager struct { // NewProvider returns a new sybil protection provider that uses the ProofOfStake module. func NewProvider(opts ...options.Option[SeatManager]) module.Provider[*engine.Engine, seatmanager.SeatManager] { return module.Provide(func(e *engine.Engine) seatmanager.SeatManager { - return options.Apply( - &SeatManager{ - Module: e.NewSubModule("SeatManager"), - apiProvider: e, - events: seatmanager.NewEvents(), - committeeStore: e.Storage.Committee(), - }, opts, func(s *SeatManager) { - activityTracker := activitytrackerv1.NewActivityTracker(e) - s.activityTracker = activityTracker - s.events.OnlineCommitteeSeatAdded.LinkTo(activityTracker.Events.OnlineCommitteeSeatAdded) - s.events.OnlineCommitteeSeatRemoved.LinkTo(activityTracker.Events.OnlineCommitteeSeatRemoved) - - e.Events.SeatManager.LinkTo(s.events) - - e.ConstructedEvent().OnTrigger(func() { - // We need to mark validators as active upon solidity of blocks as otherwise we would not be able to - // recover if no node was part of the online committee anymore. - e.Events.PostSolidFilter.BlockAllowed.Hook(func(block *blocks.Block) { - // Only track identities that are part of the committee. - committee, exists := s.CommitteeInSlot(block.ID().Slot()) - if !exists { - panic(ierrors.Errorf("committee not selected for slot %d, but received block in that slot", block.ID().Slot())) - } - - seat, exists := committee.GetSeat(block.ProtocolBlock().Header.IssuerID) - if exists { - s.activityTracker.MarkSeatActive(seat, block.ProtocolBlock().Header.IssuerID, block.IssuingTime()) - } - - s.events.BlockProcessed.Trigger(block) - }) - - s.ShutdownEvent().OnTrigger(func() { - s.StoppedEvent().Trigger() - }) - - s.InitializedEvent().Trigger() - }) - - s.ConstructedEvent().Trigger() + s := New(e.NewSubModule("SeatManager"), e, opts...) + + e.ConstructedEvent().OnTrigger(func() { + // We need to mark validators as active upon solidity of blocks as otherwise we would not be able to + // recover if no node was part of the online committee anymore. + e.Events.PostSolidFilter.BlockAllowed.Hook(func(block *blocks.Block) { + // Only track identities that are part of the committee. + committee, exists := s.CommitteeInSlot(block.ID().Slot()) + if !exists { + panic(ierrors.Errorf("committee not selected for slot %d, but received block in that slot", block.ID().Slot())) + } + + seat, exists := committee.GetSeat(block.ProtocolBlock().Header.IssuerID) + if exists { + s.activityTracker.MarkSeatActive(seat, block.ProtocolBlock().Header.IssuerID, block.IssuingTime()) + } + + s.events.BlockProcessed.Trigger(block) }) + + e.Events.SeatManager.LinkTo(s.events) + + s.InitializedEvent().Trigger() + }) + + return s + }) +} + +func New(subModule module.Module, engine *engine.Engine, opts ...options.Option[SeatManager]) *SeatManager { + return options.Apply(&SeatManager{ + Module: subModule, + apiProvider: engine, + events: seatmanager.NewEvents(), + committeeStore: engine.Storage.Committee(), + }, opts, func(s *SeatManager) { + activityTracker := activitytrackerv1.NewActivityTracker(engine) + s.activityTracker = activityTracker + s.events.OnlineCommitteeSeatAdded.LinkTo(activityTracker.Events.OnlineCommitteeSeatAdded) + s.events.OnlineCommitteeSeatRemoved.LinkTo(activityTracker.Events.OnlineCommitteeSeatRemoved) + + s.ShutdownEvent().OnTrigger(func() { + s.StoppedEvent().Trigger() + }) + + s.ConstructedEvent().Trigger() }) } diff --git a/pkg/protocol/sybilprotection/sybilprotectionv1/sybilprotection.go b/pkg/protocol/sybilprotection/sybilprotectionv1/sybilprotection.go index fb9f867c4..00acef023 100644 --- a/pkg/protocol/sybilprotection/sybilprotectionv1/sybilprotection.go +++ b/pkg/protocol/sybilprotection/sybilprotectionv1/sybilprotection.go @@ -46,53 +46,60 @@ type SybilProtection struct { func NewProvider(opts ...options.Option[SybilProtection]) module.Provider[*engine.Engine, sybilprotection.SybilProtection] { return module.Provide(func(e *engine.Engine) sybilprotection.SybilProtection { - return options.Apply(&SybilProtection{ - Module: e.NewSubModule("SybilProtection"), - events: sybilprotection.NewEvents(), - - apiProvider: e, - optsSeatManagerProvider: topstakers.NewProvider(), - }, opts, func(o *SybilProtection) { - o.seatManager = o.optsSeatManagerProvider(e) - - e.ConstructedEvent().OnTrigger(func() { - o.ledger = e.Ledger - o.errHandler = e.ErrorHandler("SybilProtection") - logger := e.NewChildLogger("PerformanceTracker") - latestCommittedSlot := e.Storage.Settings().LatestCommitment().Slot() - latestCommittedEpoch := o.apiProvider.APIForSlot(latestCommittedSlot).TimeProvider().EpochFromSlot(latestCommittedSlot) - o.performanceTracker = performance.NewTracker(e.Storage.RewardsForEpoch, e.Storage.PoolStats(), e.Storage.Committee(), e.Storage.CommitteeCandidates, e.Storage.ValidatorPerformances, latestCommittedEpoch, e, o.errHandler, logger) - o.lastCommittedSlot = latestCommittedSlot - - if o.optsInitialCommittee != nil { - if _, err := o.seatManager.RotateCommittee(0, o.optsInitialCommittee); err != nil { - panic(ierrors.Wrap(err, "error while registering initial committee for epoch 0")) - } + s := New(e.NewSubModule("SybilProtection"), e, opts...) + + e.ConstructedEvent().OnTrigger(func() { + s.ledger = e.Ledger + s.errHandler = e.ErrorHandler("SybilProtection") + logger := e.NewChildLogger("PerformanceTracker") + latestCommittedSlot := e.Storage.Settings().LatestCommitment().Slot() + latestCommittedEpoch := s.apiProvider.APIForSlot(latestCommittedSlot).TimeProvider().EpochFromSlot(latestCommittedSlot) + s.performanceTracker = performance.NewTracker(e.Storage.RewardsForEpoch, e.Storage.PoolStats(), e.Storage.Committee(), e.Storage.CommitteeCandidates, e.Storage.ValidatorPerformances, latestCommittedEpoch, e, s.errHandler, logger) + s.lastCommittedSlot = latestCommittedSlot + + if s.optsInitialCommittee != nil { + if _, err := s.seatManager.RotateCommittee(0, s.optsInitialCommittee); err != nil { + panic(ierrors.Wrap(err, "error while registering initial committee for epoch 0")) + } + } + + s.ConstructedEvent().Trigger() + + // When the engine is triggered initialized, snapshot has been read or database has been initialized properly, + // so the committee should be available in the performance manager. + e.InitializedEvent().OnTrigger(func() { + // Mark the committee for the last committed slot as active. + currentEpoch := e.CommittedAPI().TimeProvider().EpochFromSlot(e.Storage.Settings().LatestCommitment().Slot()) + err := s.seatManager.InitializeCommittee(currentEpoch, e.Clock.Accepted().RelativeTime()) + if err != nil { + panic(ierrors.Wrap(err, "error while initializing committee")) } - // When the engine is triggered initialized, snapshot has been read or database has been initialized properly, - // so the committee should be available in the performance manager. - e.InitializedEvent().OnTrigger(func() { - // Mark the committee for the last committed slot as active. - currentEpoch := e.CommittedAPI().TimeProvider().EpochFromSlot(e.Storage.Settings().LatestCommitment().Slot()) - err := o.seatManager.InitializeCommittee(currentEpoch, e.Clock.Accepted().RelativeTime()) - if err != nil { - panic(ierrors.Wrap(err, "error while initializing committee")) - } - - o.InitializedEvent().Trigger() - }) + s.InitializedEvent().Trigger() }) - e.Events.SlotGadget.SlotFinalized.Hook(o.slotFinalized) + e.Events.SlotGadget.SlotFinalized.Hook(s.slotFinalized) - e.Events.SybilProtection.LinkTo(o.events) + e.Events.SybilProtection.LinkTo(s.events) - o.ShutdownEvent().OnTrigger(func() { - o.StoppedEvent().Trigger() - }) + s.InitializedEvent().Trigger() + }) + + return s + }) +} - o.ConstructedEvent().Trigger() +func New(subModule module.Module, engine *engine.Engine, opts ...options.Option[SybilProtection]) *SybilProtection { + return options.Apply(&SybilProtection{ + Module: subModule, + events: sybilprotection.NewEvents(), + apiProvider: engine, + optsSeatManagerProvider: topstakers.NewProvider(), + }, opts, func(s *SybilProtection) { + s.seatManager = s.optsSeatManagerProvider(engine) + + s.ShutdownEvent().OnTrigger(func() { + s.StoppedEvent().Trigger() }) }) } diff --git a/pkg/retainer/blockretainer/block_retainer.go b/pkg/retainer/blockretainer/block_retainer.go index 5e75cbf5d..1e98e1494 100644 --- a/pkg/retainer/blockretainer/block_retainer.go +++ b/pkg/retainer/blockretainer/block_retainer.go @@ -7,6 +7,7 @@ import ( "github.com/iotaledger/hive.go/kvstore" "github.com/iotaledger/hive.go/runtime/event" "github.com/iotaledger/hive.go/runtime/module" + "github.com/iotaledger/hive.go/runtime/options" "github.com/iotaledger/hive.go/runtime/workerpool" "github.com/iotaledger/iota-core/pkg/model" "github.com/iotaledger/iota-core/pkg/protocol/engine" @@ -36,72 +37,71 @@ type BlockRetainer struct { module.Module } -func New(module module.Module, workersGroup *workerpool.Group, retainerStoreFunc StoreFunc, finalizedSlotFunc FinalizedSlotFunc, errorHandler func(error)) *BlockRetainer { - b := &BlockRetainer{ - Module: module, +func New(subModule module.Module, workersGroup *workerpool.Group, retainerStoreFunc StoreFunc, finalizedSlotFunc FinalizedSlotFunc, errorHandler func(error)) *BlockRetainer { + return options.Apply(&BlockRetainer{ + Module: subModule, events: retainer.NewBlockRetainerEvents(), - workerPool: workersGroup.CreatePool("Retainer", workerpool.WithWorkerCount(1)), + workerPool: workersGroup.CreatePool("BlockRetainer", workerpool.WithWorkerCount(1)), store: retainerStoreFunc, cache: newCache(), finalizedSlotFunc: finalizedSlotFunc, errorHandler: errorHandler, - } + }, nil, func(r *BlockRetainer) { + r.ShutdownEvent().OnTrigger(r.shutdown) - b.ShutdownEvent().OnTrigger(func() { - b.StoppedEvent().Trigger() + r.ConstructedEvent().Trigger() }) - - b.ConstructedEvent().Trigger() - - return b } // NewProvider creates a new BlockRetainer provider. func NewProvider() module.Provider[*engine.Engine, retainer.BlockRetainer] { return module.Provide(func(e *engine.Engine) retainer.BlockRetainer { - r := New(e.NewSubModule("BlockRetainer"), e.Workers.CreateGroup("Retainer"), + r := New(e.NewSubModule("BlockRetainer"), + e.Workers.CreateGroup("BlockRetainer"), e.Storage.BlockMetadata, func() iotago.SlotIndex { return e.SyncManager.LatestFinalizedSlot() }, - e.ErrorHandler("retainer")) + e.ErrorHandler("blockRetainer")) asyncOpt := event.WithWorkerPool(r.workerPool) - e.Events.Booker.BlockBooked.Hook(func(b *blocks.Block) { - if err := r.OnBlockBooked(b); err != nil { - r.errorHandler(ierrors.Wrap(err, "failed to store on BlockBooked in retainer")) - } - }, asyncOpt) + e.ConstructedEvent().OnTrigger(func() { + e.Events.Booker.BlockBooked.Hook(func(b *blocks.Block) { + if err := r.OnBlockBooked(b); err != nil { + r.errorHandler(ierrors.Wrap(err, "failed to store on BlockBooked in retainer")) + } + }, asyncOpt) - e.Events.BlockGadget.BlockAccepted.Hook(func(b *blocks.Block) { - if err := r.OnBlockAccepted(b.ID()); err != nil { - r.errorHandler(ierrors.Wrap(err, "failed to store on BlockAccepted in retainer")) - } - }, asyncOpt) + e.Events.BlockGadget.BlockAccepted.Hook(func(b *blocks.Block) { + if err := r.OnBlockAccepted(b.ID()); err != nil { + r.errorHandler(ierrors.Wrap(err, "failed to store on BlockAccepted in retainer")) + } + }, asyncOpt) - e.Events.BlockGadget.BlockConfirmed.Hook(func(b *blocks.Block) { - if err := r.OnBlockConfirmed(b.ID()); err != nil { - r.errorHandler(ierrors.Wrap(err, "failed to store on BlockConfirmed in retainer")) - } - }, asyncOpt) + e.Events.BlockGadget.BlockConfirmed.Hook(func(b *blocks.Block) { + if err := r.OnBlockConfirmed(b.ID()); err != nil { + r.errorHandler(ierrors.Wrap(err, "failed to store on BlockConfirmed in retainer")) + } + }, asyncOpt) - e.Events.Scheduler.BlockDropped.Hook(func(b *blocks.Block, _ error) { - if err := r.OnBlockDropped(b.ID()); err != nil { - r.errorHandler(ierrors.Wrap(err, "failed to store on BlockDropped in retainer")) - } - }) + e.Events.Scheduler.BlockDropped.Hook(func(b *blocks.Block, _ error) { + if err := r.OnBlockDropped(b.ID()); err != nil { + r.errorHandler(ierrors.Wrap(err, "failed to store on BlockDropped in retainer")) + } + }) - // this event is fired when a new commitment is detected - e.Events.Notarization.LatestCommitmentUpdated.Hook(func(commitment *model.Commitment) { - if err := r.CommitSlot(commitment.Slot()); err != nil { - panic(err) - } - }, asyncOpt) + // this event is fired when a new commitment is detected + e.Events.Notarization.LatestCommitmentUpdated.Hook(func(commitment *model.Commitment) { + if err := r.CommitSlot(commitment.Slot()); err != nil { + panic(err) + } + }, asyncOpt) - e.Events.BlockRetainer.BlockRetained.LinkTo(r.events.BlockRetained) + e.Events.BlockRetainer.BlockRetained.LinkTo(r.events.BlockRetained) - r.InitializedEvent().Trigger() + r.InitializedEvent().Trigger() + }) return r }) @@ -115,8 +115,11 @@ func (r *BlockRetainer) Reset() { r.cache.uncommittedBlockMetadata.Clear() } -func (r *BlockRetainer) Shutdown() { +// Shutdown shuts down the BlockRetainer. +func (r *BlockRetainer) shutdown() { r.workerPool.Shutdown() + + r.StoppedEvent().Trigger() } func (r *BlockRetainer) BlockMetadata(blockID iotago.BlockID) (*api.BlockMetadataResponse, error) { diff --git a/pkg/retainer/txretainer/tx_retainer.go b/pkg/retainer/txretainer/tx_retainer.go index faba1de84..8cfad09a7 100644 --- a/pkg/retainer/txretainer/tx_retainer.go +++ b/pkg/retainer/txretainer/tx_retainer.go @@ -111,9 +111,9 @@ func WithDebugStoreErrorMessages(store bool) options.Option[TransactionRetainer] } } -func New(parentModule module.Module, workersGroup *workerpool.Group, dbExecFunc storage.SQLDatabaseExecFunc, latestCommittedSlotFunc SlotFunc, finalizedSlotFunc SlotFunc, errorHandler func(error), opts ...options.Option[TransactionRetainer]) *TransactionRetainer { - return module.InitSimpleLifecycle(options.Apply(&TransactionRetainer{ - Module: parentModule.NewSubModule("TransactionRetainer"), +func New(subModule module.Module, workersGroup *workerpool.Group, dbExecFunc storage.SQLDatabaseExecFunc, latestCommittedSlotFunc SlotFunc, finalizedSlotFunc SlotFunc, errorHandler func(error), opts ...options.Option[TransactionRetainer]) *TransactionRetainer { + return options.Apply(&TransactionRetainer{ + Module: subModule, events: retainer.NewTransactionRetainerEvents(), workerPool: workersGroup.CreatePool("TxRetainer", workerpool.WithWorkerCount(1)), txRetainerCache: NewTransactionRetainerCache(), @@ -121,13 +121,18 @@ func New(parentModule module.Module, workersGroup *workerpool.Group, dbExecFunc latestCommittedSlotFunc: latestCommittedSlotFunc, finalizedSlotFunc: finalizedSlotFunc, errorHandler: errorHandler, - }, opts), (*TransactionRetainer).shutdown) + }, opts, func(r *TransactionRetainer) { + r.ShutdownEvent().OnTrigger(r.shutdown) + + r.ConstructedEvent().Trigger() + }) } // NewProvider creates a new TransactionRetainer provider. func NewProvider(opts ...options.Option[TransactionRetainer]) module.Provider[*engine.Engine, retainer.TransactionRetainer] { return module.Provide(func(e *engine.Engine) retainer.TransactionRetainer { - r := New(e, e.Workers.CreateGroup("TransactionRetainer"), + r := New(e.NewSubModule("TransactionRetainer"), + e.Workers.CreateGroup("TransactionRetainer"), e.Storage.TransactionRetainerDatabaseExecFunc(), func() iotago.SlotIndex { return e.SyncManager.LatestCommitment().Slot() @@ -141,7 +146,7 @@ func NewProvider(opts ...options.Option[TransactionRetainer]) module.Provider[*e asyncOpt := event.WithWorkerPool(r.workerPool) - e.InitializedEvent().OnTrigger(func() { + e.ConstructedEvent().OnTrigger(func() { // attaching the transaction failed for some reason => store the error // HINT: we treat the transaction as unsigned here, because we don't know if it was signed or not. // This should not be a problem, because the error reason will still be stored and visible to the user, @@ -244,11 +249,11 @@ func NewProvider(opts ...options.Option[TransactionRetainer]) module.Provider[*e r.errorHandler(err) } }, asyncOpt) - }) - e.Events.TransactionRetainer.TransactionRetained.LinkTo(r.events.TransactionRetained) + e.Events.TransactionRetainer.TransactionRetained.LinkTo(r.events.TransactionRetained) - r.InitializedEvent().Trigger() + r.InitializedEvent().Trigger() + }) return r })