diff --git a/pkg/protocol/engine/congestioncontrol/scheduler/drr/drrbuffer.go b/pkg/protocol/engine/congestioncontrol/scheduler/drr/drrbuffer.go index c283bbaba..86dc0dac6 100644 --- a/pkg/protocol/engine/congestioncontrol/scheduler/drr/drrbuffer.go +++ b/pkg/protocol/engine/congestioncontrol/scheduler/drr/drrbuffer.go @@ -6,16 +6,12 @@ import ( "time" "github.com/iotaledger/hive.go/ds/shrinkingmap" - "github.com/iotaledger/hive.go/ierrors" "github.com/iotaledger/hive.go/lo" "github.com/iotaledger/iota-core/pkg/protocol/engine/blocks" iotago "github.com/iotaledger/iota.go/v4" ) -// ErrInsufficientMana is returned when the mana is insufficient. -var ErrInsufficientMana = ierrors.New("insufficient issuer's mana to schedule the block") - // region BufferQueue ///////////////////////////////////////////////////////////////////////////////////////////// // BufferQueue represents a buffer of IssuerQueue. @@ -65,6 +61,26 @@ func (b *BufferQueue) IssuerQueue(issuerID iotago.AccountID) *IssuerQueue { return issuerQueue } +// IssuerQueueWork returns the total WorkScore of block in the queue for the corresponding issuer. +func (b *BufferQueue) IssuerQueueWork(issuerID iotago.AccountID) iotago.WorkScore { + issuerQueue := b.IssuerQueue(issuerID) + if issuerQueue == nil { + return 0 + } + + return issuerQueue.Work() +} + +// IssuerQueueSize returns the number of blocks in the queue for the corresponding issuer. +func (b *BufferQueue) IssuerQueueBlockCount(issuerID iotago.AccountID) int { + issuerQueue := b.IssuerQueue(issuerID) + if issuerQueue == nil { + return 0 + } + + return issuerQueue.Size() +} + func (b *BufferQueue) CreateIssuerQueue(issuerID iotago.AccountID) *IssuerQueue { issuerQueue := NewIssuerQueue(issuerID) b.activeIssuers.Set(issuerID, b.ringInsert(issuerQueue)) @@ -72,24 +88,56 @@ func (b *BufferQueue) CreateIssuerQueue(issuerID iotago.AccountID) *IssuerQueue return issuerQueue } -func (b *BufferQueue) GetIssuerQueue(issuerID iotago.AccountID) (*IssuerQueue, error) { +func (b *BufferQueue) GetOrCreateIssuerQueue(issuerID iotago.AccountID) *IssuerQueue { element, issuerActive := b.activeIssuers.Get(issuerID) if !issuerActive { - return nil, ierrors.New("issuer queue does not exist") + // create new issuer queue + return b.CreateIssuerQueue(issuerID) } issuerQueue, isIQ := element.Value.(*IssuerQueue) if !isIQ { - return nil, ierrors.New("buffer contains elements that are not issuer queues") + panic("buffer contains elements that are not issuer queues") } - // issuer queue exists - return issuerQueue, nil + return issuerQueue +} + +// RemoveIssuerQueue removes all blocks (submitted and ready) for the given issuer and deletes the issuer queue. +func (b *BufferQueue) RemoveIssuerQueue(issuerID iotago.AccountID) { + element, ok := b.activeIssuers.Get(issuerID) + if !ok { + return + } + issuerQueue, isIQ := element.Value.(*IssuerQueue) + if !isIQ { + panic("buffer contains elements that are not issuer queues") + } + b.size -= issuerQueue.Size() + + b.ringRemove(element) + b.activeIssuers.Delete(issuerID) +} + +// RemoveIssuerQueueIfEmpty removes all blocks (submitted and ready) for the given issuer and deletes the issuer queue if it is empty. +func (b *BufferQueue) RemoveIssuerQueueIfEmpty(issuerID iotago.AccountID) { + element, ok := b.activeIssuers.Get(issuerID) + if !ok { + return + } + issuerQueue, isIQ := element.Value.(*IssuerQueue) + if !isIQ { + panic("buffer contains elements that are not issuer queues") + } + + if issuerQueue.Size() == 0 { + b.ringRemove(element) + b.activeIssuers.Delete(issuerID) + } } // Submit submits a block. Return blocks dropped from the scheduler to make room for the submitted block. // The submitted block can also be returned as dropped if the issuer does not have enough mana. func (b *BufferQueue) Submit(blk *blocks.Block, issuerQueue *IssuerQueue, quantumFunc func(iotago.AccountID) Deficit, maxBuffer int) ([]*blocks.Block, bool) { - // first we submit the block, and if it turns out that the issuer doesn't have enough bandwidth to submit, it will be removed by dropTail if !issuerQueue.Submit(blk) { return nil, false @@ -105,45 +153,6 @@ func (b *BufferQueue) Submit(blk *blocks.Block, issuerQueue *IssuerQueue, quantu return nil, true } -func (b *BufferQueue) dropTail(quantumFunc func(iotago.AccountID) Deficit, maxBuffer int) (droppedBlocks []*blocks.Block) { - start := b.Current() - ringStart := b.ring - // remove as many blocks as necessary to stay within max buffer size - for b.Size() > maxBuffer { - // TODO: extract to util func - // find longest mana-scaled queue - maxScale := math.Inf(-1) - var maxIssuerID iotago.AccountID - for q := start; ; { - if issuerQuantum := quantumFunc(q.IssuerID()); issuerQuantum > 0 { - if scale := float64(q.Work()) / float64(issuerQuantum); scale > maxScale { - maxScale = scale - maxIssuerID = q.IssuerID() - } - } else if q.Size() > 0 { - maxIssuerID = q.IssuerID() - // return to the start of the issuer ring and break as this is the max value we can have. - b.ring = ringStart - - break - } - q = b.Next() - if q == start { - break - } - } - - if longestQueue := b.IssuerQueue(maxIssuerID); longestQueue != nil { - if tail := longestQueue.RemoveTail(); tail != nil { - b.size-- - droppedBlocks = append(droppedBlocks, tail) - } - } - } - - return droppedBlocks -} - // Unsubmit removes a block from the submitted blocks. // If that block is already marked as ready, Unsubmit has no effect. func (b *BufferQueue) Unsubmit(block *blocks.Block) bool { @@ -209,34 +218,6 @@ func (b *BufferQueue) TotalBlocksCount() (blocksCount int) { return } -// InsertIssuer creates a queue for the given issuer and adds it to the list of active issuers. -func (b *BufferQueue) InsertIssuer(issuerID iotago.AccountID) { - _, issuerActive := b.activeIssuers.Get(issuerID) - if issuerActive { - return - } - - issuerQueue := NewIssuerQueue(issuerID) - b.activeIssuers.Set(issuerID, b.ringInsert(issuerQueue)) -} - -// RemoveIssuer removes all blocks (submitted and ready) for the given issuer. -func (b *BufferQueue) RemoveIssuer(issuerID iotago.AccountID) { - element, ok := b.activeIssuers.Get(issuerID) - if !ok { - return - } - - issuerQueue, isIQ := element.Value.(*IssuerQueue) - if !isIQ { - return - } - b.size -= issuerQueue.Size() - - b.ringRemove(element) - b.activeIssuers.Delete(issuerID) -} - // Next returns the next IssuerQueue in round-robin order. func (b *BufferQueue) Next() *IssuerQueue { if b.ring != nil { @@ -297,6 +278,49 @@ func (b *BufferQueue) IssuerIDs() []iotago.AccountID { return issuerIDs } +func (b *BufferQueue) dropTail(quantumFunc func(iotago.AccountID) Deficit, maxBuffer int) (droppedBlocks []*blocks.Block) { + // remove as many blocks as necessary to stay within max buffer size + for b.Size() > maxBuffer { + // find the longest mana-scaled queue + maxIssuerID := b.longestQueueIssuerID(quantumFunc) + if longestQueue := b.IssuerQueue(maxIssuerID); longestQueue != nil { + if tail := longestQueue.RemoveTail(); tail != nil { + b.size-- + droppedBlocks = append(droppedBlocks, tail) + } + } + } + + return droppedBlocks +} + +func (b *BufferQueue) longestQueueIssuerID(quantumFunc func(iotago.AccountID) Deficit) iotago.AccountID { + start := b.Current() + ringStart := b.ring + maxScale := math.Inf(-1) + var maxIssuerID iotago.AccountID + for q := start; ; { + if issuerQuantum := quantumFunc(q.IssuerID()); issuerQuantum > 0 { + if scale := float64(q.Work()) / float64(issuerQuantum); scale > maxScale { + maxScale = scale + maxIssuerID = q.IssuerID() + } + } else if q.Size() > 0 { + // if the issuer has no quantum, then this is the max queue size + maxIssuerID = q.IssuerID() + b.ring = ringStart + + break + } + q = b.Next() + if q == start { + break + } + } + + return maxIssuerID +} + func (b *BufferQueue) ringRemove(r *ring.Ring) { n := b.ring.Next() if r == b.ring { diff --git a/pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go b/pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go index d688c03c1..917787009 100644 --- a/pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go +++ b/pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go @@ -154,12 +154,12 @@ func (s *Scheduler) Start() { s.TriggerInitialized() } -// IssuerQueueSizeCount returns the number of blocks in the queue of the given issuer. +// IssuerQueueBlockCount returns the number of blocks in the queue of the given issuer. func (s *Scheduler) IssuerQueueBlockCount(issuerID iotago.AccountID) int { s.bufferMutex.RLock() defer s.bufferMutex.RUnlock() - return s.basicBuffer.IssuerQueue(issuerID).Size() + return s.basicBuffer.IssuerQueueBlockCount(issuerID) } // IssuerQueueWork returns the queue size of the given issuer in work units. @@ -167,7 +167,7 @@ func (s *Scheduler) IssuerQueueWork(issuerID iotago.AccountID) iotago.WorkScore s.bufferMutex.RLock() defer s.bufferMutex.RUnlock() - return s.basicBuffer.IssuerQueue(issuerID).Work() + return s.basicBuffer.IssuerQueueWork(issuerID) } // ValidatorQueueBlockCount returns the number of validation blocks in the validator queue of the given issuer. @@ -233,7 +233,7 @@ func (s *Scheduler) IsBlockIssuerReady(accountID iotago.AccountID, blocks ...*bl return false } - return deficit >= s.deficitFromWork(work+s.basicBuffer.IssuerQueue(accountID).Work()) + return deficit >= s.deficitFromWork(work+s.basicBuffer.IssuerQueueWork(accountID)) } func (s *Scheduler) AddBlock(block *blocks.Block) { @@ -251,17 +251,7 @@ func (s *Scheduler) enqueueBasicBlock(block *blocks.Block) { slot := s.latestCommittedSlot() issuerID := block.ProtocolBlock().Header.IssuerID - issuerQueue, err := s.basicBuffer.GetIssuerQueue(issuerID) - if err != nil { - // this should only ever happen if the issuer has been removed due to insufficient Mana. - // if Mana is now sufficient again, we can add the issuer again. - _, quantumErr := s.quantumFunc(issuerID, slot) - if quantumErr != nil { - s.errorHandler(ierrors.Wrapf(quantumErr, "failed to retrieve quantum for issuerID %s in slot %d when adding a block", issuerID, slot)) - } - - issuerQueue = s.createIssuer(issuerID) - } + issuerQueue := s.getOrCreateIssuer(issuerID) droppedBlocks, submitted := s.basicBuffer.Submit( block, @@ -443,7 +433,7 @@ func (s *Scheduler) selectBasicBlockWithoutLocking() { // increment every issuer's deficit for the required number of rounds for q := start; ; { issuerID := q.IssuerID() - if err := s.incrementDeficit(issuerID, rounds, slot); err != nil { + if _, err := s.incrementDeficit(issuerID, rounds, slot); err != nil { s.errorHandler(ierrors.Wrapf(err, "failed to increment deficit for issuerID %s in slot %d", issuerID, slot)) s.removeIssuer(issuerID, err) @@ -462,20 +452,24 @@ func (s *Scheduler) selectBasicBlockWithoutLocking() { // increment the deficit for all issuers before schedulingIssuer one more time for q := start; q != schedulingIssuer; q = s.basicBuffer.Next() { issuerID := q.IssuerID() - if err := s.incrementDeficit(issuerID, 1, slot); err != nil { + newDeficit, err := s.incrementDeficit(issuerID, 1, slot) + if err != nil { s.errorHandler(ierrors.Wrapf(err, "failed to increment deficit for issuerID %s in slot %d", issuerID, slot)) s.removeIssuer(issuerID, err) return } + + // remove empty issuer queues of issuers with max deficit. + if newDeficit == s.maxDeficit() { + s.basicBuffer.RemoveIssuerQueueIfEmpty(issuerID) + } } // remove the block from the buffer and adjust issuer's deficit block := s.basicBuffer.PopFront() issuerID := block.ProtocolBlock().Header.IssuerID - err := s.updateDeficit(issuerID, -s.deficitFromWork(block.WorkScore())) - - if err != nil { + if _, err := s.updateDeficit(issuerID, -s.deficitFromWork(block.WorkScore())); err != nil { // if something goes wrong with deficit update, drop the block instead of scheduling it. block.SetDropped() s.events.BlockDropped.Trigger(block, err) @@ -576,8 +570,14 @@ func (s *Scheduler) removeIssuer(issuerID iotago.AccountID, err error) { } s.deficits.Delete(issuerID) + s.basicBuffer.RemoveIssuerQueue(issuerID) +} - s.basicBuffer.RemoveIssuer(issuerID) +func (s *Scheduler) getOrCreateIssuer(accountID iotago.AccountID) *IssuerQueue { + issuerQueue := s.basicBuffer.GetOrCreateIssuerQueue(accountID) + s.deficits.GetOrCreate(accountID, func() Deficit { return 0 }) + + return issuerQueue } func (s *Scheduler) createIssuer(accountID iotago.AccountID) *IssuerQueue { @@ -587,16 +587,16 @@ func (s *Scheduler) createIssuer(accountID iotago.AccountID) *IssuerQueue { return issuerQueue } -func (s *Scheduler) updateDeficit(accountID iotago.AccountID, delta Deficit) error { +func (s *Scheduler) updateDeficit(accountID iotago.AccountID, delta Deficit) (Deficit, error) { var updateErr error - s.deficits.Compute(accountID, func(currentValue Deficit, exists bool) Deficit { + updatedDeficit := s.deficits.Compute(accountID, func(currentValue Deficit, exists bool) Deficit { if !exists { updateErr = ierrors.Errorf("could not get deficit for issuer %s", accountID) return 0 } newDeficit, err := safemath.SafeAdd(currentValue, delta) - if err != nil { - // It can only overflow. We never allow the value to go below 0, so underflow is impossible. + // It can only overflow. We never allow the value to go below 0, so underflow is impossible. + if err != nil || newDeficit >= s.maxDeficit() { return s.maxDeficit() } @@ -606,22 +606,22 @@ func (s *Scheduler) updateDeficit(accountID iotago.AccountID, delta Deficit) err return 0 } - return lo.Min(newDeficit, s.maxDeficit()) + return newDeficit }) if updateErr != nil { s.removeIssuer(accountID, updateErr) - return updateErr + return 0, updateErr } - return nil + return updatedDeficit, nil } -func (s *Scheduler) incrementDeficit(issuerID iotago.AccountID, rounds Deficit, slot iotago.SlotIndex) error { +func (s *Scheduler) incrementDeficit(issuerID iotago.AccountID, rounds Deficit, slot iotago.SlotIndex) (Deficit, error) { quantum, err := s.quantumFunc(issuerID, slot) if err != nil { - return ierrors.Wrap(err, "failed to retrieve quantum") + return 0, ierrors.Wrap(err, "failed to retrieve quantum") } delta, err := safemath.SafeMul(quantum, rounds)