Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix deadlock in DDR scheduler shutdown #973

Merged
merged 2 commits into from
May 15, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 72 additions & 3 deletions pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"math"
"sync"
"sync/atomic"
"time"

"github.com/iotaledger/hive.go/core/safemath"
Expand Down Expand Up @@ -44,6 +45,8 @@ type Scheduler struct {

workersWg sync.WaitGroup
shutdownSignal chan struct{}
// isShutdown is true if the scheduler was shutdown.
isShutdown atomic.Bool

blockCache *blocks.Blocks

Expand All @@ -69,6 +72,11 @@ func NewProvider(opts ...options.Option[Scheduler]) module.Provider[*engine.Engi
e.Events.Notarization.LatestCommitmentUpdated.Hook(func(commitment *model.Commitment) {
// when the last slot of an epoch is committed, remove the queues of validators that are no longer in the committee.
if s.apiProvider.APIForSlot(commitment.Slot()).TimeProvider().SlotsBeforeNextEpoch(commitment.Slot()) == 0 {
if s.IsShutdown() {
// if the scheduler is already shutdown, we don't need to do anything.
return
}

s.bufferMutex.Lock()
defer s.bufferMutex.Unlock()

Expand Down Expand Up @@ -102,12 +110,22 @@ func NewProvider(opts ...options.Option[Scheduler]) module.Provider[*engine.Engi
s.selectBlockToScheduleWithLocking()
})
e.Events.Ledger.AccountCreated.Hook(func(accountID iotago.AccountID) {
if s.IsShutdown() {
// if the scheduler is already shutdown, we don't need to do anything.
return
}

s.bufferMutex.Lock()
defer s.bufferMutex.Unlock()

s.createIssuer(accountID)
})
e.Events.Ledger.AccountDestroyed.Hook(func(accountID iotago.AccountID) {
if s.IsShutdown() {
// if the scheduler is already shutdown, we don't need to do anything.
return
}

s.bufferMutex.Lock()
defer s.bufferMutex.Unlock()

Expand Down Expand Up @@ -142,10 +160,13 @@ func New(subModule module.Module, apiProvider iotago.APIProvider, opts ...option
}

func (s *Scheduler) shutdown() {
s.bufferMutex.Lock()
defer s.bufferMutex.Unlock()
if s.isShutdown.Swap(true) {
return
}

s.bufferMutex.Lock()
s.validatorBuffer.Clear()
s.bufferMutex.Unlock()

close(s.shutdownSignal)

Expand All @@ -154,6 +175,10 @@ func (s *Scheduler) shutdown() {
s.StoppedEvent().Trigger()
}

func (s *Scheduler) IsShutdown() bool {
return s.isShutdown.Load()
}

// Start starts the scheduler.
func (s *Scheduler) Start() {
s.shutdownSignal = make(chan struct{}, 1)
Expand Down Expand Up @@ -200,13 +225,23 @@ func (s *Scheduler) MaxBufferSize() int {

// ReadyBlocksCount returns the number of ready blocks.
func (s *Scheduler) ReadyBlocksCount() int {
if s.IsShutdown() {
// if the scheduler is already shutdown, we return 0.
return 0
}

s.bufferMutex.RLock()
defer s.bufferMutex.RUnlock()

return s.basicBuffer.ReadyBlocksCount()
}

func (s *Scheduler) IsBlockIssuerReady(accountID iotago.AccountID, workScores ...iotago.WorkScore) bool {
if s.IsShutdown() {
// if the scheduler is already shutdown, we return false.
return false
}

s.bufferMutex.RLock()
defer s.bufferMutex.RUnlock()

Expand Down Expand Up @@ -243,6 +278,11 @@ func (s *Scheduler) AddBlock(block *blocks.Block) {

// Reset resets the component to a clean state as if it was created at the last commitment.
func (s *Scheduler) Reset() {
if s.IsShutdown() {
// if the scheduler is already shutdown, we don't need to do anything.
return
}

s.bufferMutex.Lock()
defer s.bufferMutex.Unlock()

Expand All @@ -251,6 +291,11 @@ func (s *Scheduler) Reset() {
}

func (s *Scheduler) enqueueBasicBlock(block *blocks.Block) {
if s.IsShutdown() {
// if the scheduler is already shutdown, we don't need to do anything.
return
}

s.bufferMutex.Lock()
defer s.bufferMutex.Unlock()

Expand Down Expand Up @@ -289,6 +334,11 @@ func (s *Scheduler) enqueueBasicBlock(block *blocks.Block) {
}

func (s *Scheduler) enqueueValidationBlock(block *blocks.Block) {
if s.IsShutdown() {
// if the scheduler is already shutdown, we don't need to do anything.
return
}

s.bufferMutex.Lock()
defer s.bufferMutex.Unlock()

Expand Down Expand Up @@ -338,6 +388,9 @@ loop:
for {
select {
// on close, exit the loop
case <-s.shutdownSignal:
break loop
// on close, exit the loop
case <-validatorQueue.shutdownSignal:
break loop
// when a block is pushed by this validator queue.
Expand Down Expand Up @@ -384,6 +437,11 @@ func (s *Scheduler) scheduleValidationBlock(block *blocks.Block, validatorQueue
}

func (s *Scheduler) selectBlockToScheduleWithLocking() {
if s.IsShutdown() {
// if the scheduler is already shutdown, we don't need to do anything.
return
}

s.bufferMutex.Lock()
defer s.bufferMutex.Unlock()

Expand Down Expand Up @@ -424,8 +482,14 @@ func (s *Scheduler) selectBasicBlockWithoutLocking() {
})
}

start := s.basicBuffer.Current()
if start == nil {
// if there are no queues in the buffer, we cannot schedule anything
return
}

// increment the deficit for all issuers before schedulingIssuer one more time
for q := s.basicBuffer.Current(); q != schedulingIssuer; q = s.basicBuffer.Next() {
for q := start; q != schedulingIssuer; q = s.basicBuffer.Next() {
issuerID := q.IssuerID()
newDeficit, err := s.incrementDeficit(issuerID, 1, slot)
if err != nil {
Expand Down Expand Up @@ -652,6 +716,11 @@ func (s *Scheduler) tryReadyValidationBlock(block *blocks.Block) {
// updateChildrenWithLocking locks the buffer mutex and iterates over the direct children of the given blockID and
// tries to mark them as ready.
func (s *Scheduler) updateChildrenWithLocking(block *blocks.Block) {
if s.IsShutdown() {
// if the scheduler is already shutdown, we don't need to do anything.
return
}

s.bufferMutex.Lock()
defer s.bufferMutex.Unlock()

Expand Down
Loading