diff --git a/pkg/model/block.go b/pkg/model/block.go index 404847121..bc5e604fd 100644 --- a/pkg/model/block.go +++ b/pkg/model/block.go @@ -9,8 +9,6 @@ import ( ) type Block struct { - api iotago.API - blockID iotago.BlockID data []byte @@ -107,7 +105,7 @@ func (blk *Block) ValidationBlock() (validationBlock *iotago.ValidationBlock, is } func (blk *Block) String() string { - encode, err := blk.api.JSONEncode(blk.ProtocolBlock()) + encode, err := blk.protocolBlock.API.JSONEncode(blk.ProtocolBlock()) if err != nil { panic(err) } diff --git a/pkg/protocol/sybilprotection/seatmanager/poa/poa.go b/pkg/protocol/sybilprotection/seatmanager/poa/poa.go index 5207427c4..080a575df 100644 --- a/pkg/protocol/sybilprotection/seatmanager/poa/poa.go +++ b/pkg/protocol/sybilprotection/seatmanager/poa/poa.go @@ -22,16 +22,17 @@ import ( type SeatManager struct { events *seatmanager.Events - clock clock.Clock - timeProviderFunc func() *iotago.TimeProvider - workers *workerpool.Group - accounts *account.Accounts - committee *account.SeatedAccounts - onlineCommittee ds.Set[account.SeatIndex] - inactivityManager *timed.TaskExecutor[account.SeatIndex] - lastActivities *shrinkingmap.ShrinkingMap[account.SeatIndex, time.Time] - activityMutex syncutils.RWMutex - committeeMutex syncutils.RWMutex + clock clock.Clock + timeProviderFunc func() *iotago.TimeProvider + workers *workerpool.Group + accounts *account.Accounts + committee *account.SeatedAccounts + onlineCommittee ds.Set[account.SeatIndex] + inactivityQueue timed.PriorityQueue[account.SeatIndex] + lastActivities *shrinkingmap.ShrinkingMap[account.SeatIndex, time.Time] + lastActivityTime time.Time + activityMutex syncutils.RWMutex + committeeMutex syncutils.RWMutex optsActivityWindow time.Duration optsOnlineCommitteeStartup []iotago.AccountID @@ -44,12 +45,12 @@ func NewProvider(opts ...options.Option[SeatManager]) module.Provider[*engine.En return module.Provide(func(e *engine.Engine) seatmanager.SeatManager { return options.Apply( &SeatManager{ - events: seatmanager.NewEvents(), - workers: e.Workers.CreateGroup("SeatManager"), - accounts: account.NewAccounts(), - onlineCommittee: ds.NewSet[account.SeatIndex](), - inactivityManager: timed.NewTaskExecutor[account.SeatIndex](1), - lastActivities: shrinkingmap.New[account.SeatIndex, time.Time](), + events: seatmanager.NewEvents(), + workers: e.Workers.CreateGroup("SeatManager"), + accounts: account.NewAccounts(), + onlineCommittee: ds.NewSet[account.SeatIndex](), + inactivityQueue: timed.NewPriorityQueue[account.SeatIndex](true), + lastActivities: shrinkingmap.New[account.SeatIndex, time.Time](), optsActivityWindow: time.Second * 30, }, opts, func(s *SeatManager) { @@ -115,7 +116,6 @@ func (s *SeatManager) SeatCount() int { func (s *SeatManager) Shutdown() { s.TriggerStopped() - s.stopInactivityManager() s.workers.Shutdown() } @@ -152,34 +152,38 @@ func (s *SeatManager) SetCommittee(_ iotago.EpochIndex, validators *account.Acco s.committee = s.accounts.SelectCommittee(validators.IDs()...) } -func (s *SeatManager) stopInactivityManager() { - s.inactivityManager.Shutdown(timed.CancelPendingElements) -} - -func (s *SeatManager) markSeatActive(seat account.SeatIndex, id iotago.AccountID, activityTime time.Time) { - if s.clock.WasStopped() { - return - } - +func (s *SeatManager) markSeatActive(seat account.SeatIndex, id iotago.AccountID, seatActivityTime time.Time) { s.activityMutex.Lock() defer s.activityMutex.Unlock() - if lastActivity, exists := s.lastActivities.Get(seat); exists && lastActivity.After(activityTime) { + if lastActivity, exists := s.lastActivities.Get(seat); (exists && lastActivity.After(seatActivityTime)) || seatActivityTime.Before(s.lastActivityTime.Add(-s.optsActivityWindow)) { return } else if !exists { s.onlineCommittee.Add(seat) s.events.OnlineCommitteeSeatAdded.Trigger(seat, id) } - s.lastActivities.Set(seat, activityTime) + s.lastActivities.Set(seat, seatActivityTime) + + s.inactivityQueue.Push(seat, seatActivityTime) + + if seatActivityTime.Before(s.lastActivityTime) { + return + } + + s.lastActivityTime = seatActivityTime + + activityThreshold := seatActivityTime.Add(-s.optsActivityWindow) + for _, inactiveSeat := range s.inactivityQueue.PopUntil(activityThreshold) { + if lastActivityForInactiveSeat, exists := s.lastActivities.Get(inactiveSeat); exists && lastActivityForInactiveSeat.After(activityThreshold) { + continue + } - s.inactivityManager.ExecuteAfter(seat, func() { s.markSeatInactive(seat) }, activityTime.Add(s.optsActivityWindow).Sub(s.clock.Accepted().RelativeTime())) + s.markSeatInactive(inactiveSeat) + } } func (s *SeatManager) markSeatInactive(seat account.SeatIndex) { - s.activityMutex.Lock() - defer s.activityMutex.Unlock() - s.lastActivities.Delete(seat) s.onlineCommittee.Delete(seat)