Skip to content

Commit

Permalink
Merge pull request #373 from iotaledger/fix/activity-leak
Browse files Browse the repository at this point in the history
Do not depend on acceptance time to mark inactive seats
  • Loading branch information
karimodm authored Sep 27, 2023
2 parents 67756f4 + 762fe89 commit 647fac6
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 35 deletions.
4 changes: 1 addition & 3 deletions pkg/model/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
)

type Block struct {
api iotago.API

blockID iotago.BlockID

data []byte
Expand Down Expand Up @@ -107,7 +105,7 @@ func (blk *Block) ValidationBlock() (validationBlock *iotago.ValidationBlock, is
}

func (blk *Block) String() string {
encode, err := blk.api.JSONEncode(blk.ProtocolBlock())
encode, err := blk.protocolBlock.API.JSONEncode(blk.ProtocolBlock())
if err != nil {
panic(err)
}
Expand Down
68 changes: 36 additions & 32 deletions pkg/protocol/sybilprotection/seatmanager/poa/poa.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,17 @@ import (
type SeatManager struct {
events *seatmanager.Events

clock clock.Clock
timeProviderFunc func() *iotago.TimeProvider
workers *workerpool.Group
accounts *account.Accounts
committee *account.SeatedAccounts
onlineCommittee ds.Set[account.SeatIndex]
inactivityManager *timed.TaskExecutor[account.SeatIndex]
lastActivities *shrinkingmap.ShrinkingMap[account.SeatIndex, time.Time]
activityMutex syncutils.RWMutex
committeeMutex syncutils.RWMutex
clock clock.Clock
timeProviderFunc func() *iotago.TimeProvider
workers *workerpool.Group
accounts *account.Accounts
committee *account.SeatedAccounts
onlineCommittee ds.Set[account.SeatIndex]
inactivityQueue timed.PriorityQueue[account.SeatIndex]
lastActivities *shrinkingmap.ShrinkingMap[account.SeatIndex, time.Time]
lastActivityTime time.Time
activityMutex syncutils.RWMutex
committeeMutex syncutils.RWMutex

optsActivityWindow time.Duration
optsOnlineCommitteeStartup []iotago.AccountID
Expand All @@ -44,12 +45,12 @@ func NewProvider(opts ...options.Option[SeatManager]) module.Provider[*engine.En
return module.Provide(func(e *engine.Engine) seatmanager.SeatManager {
return options.Apply(
&SeatManager{
events: seatmanager.NewEvents(),
workers: e.Workers.CreateGroup("SeatManager"),
accounts: account.NewAccounts(),
onlineCommittee: ds.NewSet[account.SeatIndex](),
inactivityManager: timed.NewTaskExecutor[account.SeatIndex](1),
lastActivities: shrinkingmap.New[account.SeatIndex, time.Time](),
events: seatmanager.NewEvents(),
workers: e.Workers.CreateGroup("SeatManager"),
accounts: account.NewAccounts(),
onlineCommittee: ds.NewSet[account.SeatIndex](),
inactivityQueue: timed.NewPriorityQueue[account.SeatIndex](true),
lastActivities: shrinkingmap.New[account.SeatIndex, time.Time](),

optsActivityWindow: time.Second * 30,
}, opts, func(s *SeatManager) {
Expand Down Expand Up @@ -115,7 +116,6 @@ func (s *SeatManager) SeatCount() int {

func (s *SeatManager) Shutdown() {
s.TriggerStopped()
s.stopInactivityManager()
s.workers.Shutdown()
}

Expand Down Expand Up @@ -152,34 +152,38 @@ func (s *SeatManager) SetCommittee(_ iotago.EpochIndex, validators *account.Acco
s.committee = s.accounts.SelectCommittee(validators.IDs()...)
}

func (s *SeatManager) stopInactivityManager() {
s.inactivityManager.Shutdown(timed.CancelPendingElements)
}

func (s *SeatManager) markSeatActive(seat account.SeatIndex, id iotago.AccountID, activityTime time.Time) {
if s.clock.WasStopped() {
return
}

func (s *SeatManager) markSeatActive(seat account.SeatIndex, id iotago.AccountID, seatActivityTime time.Time) {
s.activityMutex.Lock()
defer s.activityMutex.Unlock()

if lastActivity, exists := s.lastActivities.Get(seat); exists && lastActivity.After(activityTime) {
if lastActivity, exists := s.lastActivities.Get(seat); (exists && lastActivity.After(seatActivityTime)) || seatActivityTime.Before(s.lastActivityTime.Add(-s.optsActivityWindow)) {
return
} else if !exists {
s.onlineCommittee.Add(seat)
s.events.OnlineCommitteeSeatAdded.Trigger(seat, id)
}

s.lastActivities.Set(seat, activityTime)
s.lastActivities.Set(seat, seatActivityTime)

s.inactivityQueue.Push(seat, seatActivityTime)

if seatActivityTime.Before(s.lastActivityTime) {
return
}

s.lastActivityTime = seatActivityTime

activityThreshold := seatActivityTime.Add(-s.optsActivityWindow)
for _, inactiveSeat := range s.inactivityQueue.PopUntil(activityThreshold) {
if lastActivityForInactiveSeat, exists := s.lastActivities.Get(inactiveSeat); exists && lastActivityForInactiveSeat.After(activityThreshold) {
continue
}

s.inactivityManager.ExecuteAfter(seat, func() { s.markSeatInactive(seat) }, activityTime.Add(s.optsActivityWindow).Sub(s.clock.Accepted().RelativeTime()))
s.markSeatInactive(inactiveSeat)
}
}

func (s *SeatManager) markSeatInactive(seat account.SeatIndex) {
s.activityMutex.Lock()
defer s.activityMutex.Unlock()

s.lastActivities.Delete(seat)
s.onlineCommittee.Delete(seat)

Expand Down

0 comments on commit 647fac6

Please sign in to comment.