From 66cbf76d742979b49cccf70265d5d46372a0161c Mon Sep 17 00:00:00 2001 From: Andrew Date: Tue, 17 Oct 2023 10:40:34 +0100 Subject: [PATCH 1/3] extract longest queue finding to utility function --- .../scheduler/drr/drrbuffer.go | 54 ++++++++++--------- 1 file changed, 29 insertions(+), 25 deletions(-) diff --git a/pkg/protocol/engine/congestioncontrol/scheduler/drr/drrbuffer.go b/pkg/protocol/engine/congestioncontrol/scheduler/drr/drrbuffer.go index 0fa45bc4d..aa251e8d9 100644 --- a/pkg/protocol/engine/congestioncontrol/scheduler/drr/drrbuffer.go +++ b/pkg/protocol/engine/congestioncontrol/scheduler/drr/drrbuffer.go @@ -106,33 +106,10 @@ func (b *BufferQueue) Submit(blk *blocks.Block, issuerQueue *IssuerQueue, quantu } func (b *BufferQueue) dropTail(quantumFunc func(iotago.AccountID) Deficit, maxBuffer int) (droppedBlocks []*blocks.Block) { - start := b.Current() - ringStart := b.ring // remove as many blocks as necessary to stay within max buffer size for b.Size() > maxBuffer { - // TODO: extract to util func - // find longest mana-scaled queue - maxScale := math.Inf(-1) - var maxIssuerID iotago.AccountID - for q := start; ; { - if issuerQuantum := quantumFunc(q.IssuerID()); issuerQuantum > 0 { - if scale := float64(q.Work()) / float64(issuerQuantum); scale > maxScale { - maxScale = scale - maxIssuerID = q.IssuerID() - } - } else if q.Size() > 0 { - maxIssuerID = q.IssuerID() - // return to the start of the issuer ring and break as this is the max value we can have. - b.ring = ringStart - - break - } - q = b.Next() - if q == start { - break - } - } - + // find the longest mana-scaled queue + maxIssuerID := b.longestQueueIssuerID(quantumFunc) if longestQueue := b.IssuerQueue(maxIssuerID); longestQueue != nil { if tail := longestQueue.RemoveTail(); tail != nil { b.size-- @@ -144,6 +121,33 @@ func (b *BufferQueue) dropTail(quantumFunc func(iotago.AccountID) Deficit, maxBu return droppedBlocks } +func (b *BufferQueue) longestQueueIssuerID(quantumFunc func(iotago.AccountID) Deficit) iotago.AccountID { + start := b.Current() + ringStart := b.ring + maxScale := math.Inf(-1) + var maxIssuerID iotago.AccountID + for q := start; ; { + if issuerQuantum := quantumFunc(q.IssuerID()); issuerQuantum > 0 { + if scale := float64(q.Work()) / float64(issuerQuantum); scale > maxScale { + maxScale = scale + maxIssuerID = q.IssuerID() + } + } else if q.Size() > 0 { + // if the issuer has no quantum, then this is the max queue size + maxIssuerID = q.IssuerID() + b.ring = ringStart + + break + } + q = b.Next() + if q == start { + break + } + } + + return maxIssuerID +} + // Unsubmit removes a block from the submitted blocks. // If that block is already marked as ready, Unsubmit has no effect. func (b *BufferQueue) Unsubmit(block *blocks.Block) bool { From e92d30b588fbc8512bd4e15170f2c2f18b944e8e Mon Sep 17 00:00:00 2001 From: Andrew Date: Tue, 17 Oct 2023 13:14:11 +0100 Subject: [PATCH 2/3] remove issuer queues with max deficit and no blocks --- .../scheduler/drr/drrbuffer.go | 182 ++++++++++-------- .../scheduler/drr/scheduler.go | 62 +++--- pkg/tests/booker_test.go | 2 +- 3 files changed, 134 insertions(+), 112 deletions(-) diff --git a/pkg/protocol/engine/congestioncontrol/scheduler/drr/drrbuffer.go b/pkg/protocol/engine/congestioncontrol/scheduler/drr/drrbuffer.go index aa251e8d9..99bcc27f3 100644 --- a/pkg/protocol/engine/congestioncontrol/scheduler/drr/drrbuffer.go +++ b/pkg/protocol/engine/congestioncontrol/scheduler/drr/drrbuffer.go @@ -6,16 +6,12 @@ import ( "time" "github.com/iotaledger/hive.go/ds/shrinkingmap" - "github.com/iotaledger/hive.go/ierrors" "github.com/iotaledger/hive.go/lo" "github.com/iotaledger/iota-core/pkg/protocol/engine/blocks" iotago "github.com/iotaledger/iota.go/v4" ) -// ErrInsufficientMana is returned when the mana is insufficient. -var ErrInsufficientMana = ierrors.New("insufficient issuer's mana to schedule the block") - // region BufferQueue ///////////////////////////////////////////////////////////////////////////////////////////// // BufferQueue represents a buffer of IssuerQueue. @@ -65,6 +61,26 @@ func (b *BufferQueue) IssuerQueue(issuerID iotago.AccountID) *IssuerQueue { return issuerQueue } +// IssuerQueueWork returns the total WorkScore of block in the queue for the corresponding issuer. +func (b *BufferQueue) IssuerQueueWork(issuerID iotago.AccountID) iotago.WorkScore { + issuerQueue := b.IssuerQueue(issuerID) + if issuerQueue == nil { + return 0 + } + + return issuerQueue.Work() +} + +// IssuerQueueSize returns the number of blocks in the queue for the corresponding issuer. +func (b *BufferQueue) IssuerQueueBlockCount(issuerID iotago.AccountID) int { + issuerQueue := b.IssuerQueue(issuerID) + if issuerQueue == nil { + return 0 + } + + return issuerQueue.Size() +} + func (b *BufferQueue) CreateIssuerQueue(issuerID iotago.AccountID) *IssuerQueue { issuerQueue := NewIssuerQueue(issuerID) b.activeIssuers.Set(issuerID, b.ringInsert(issuerQueue)) @@ -72,24 +88,56 @@ func (b *BufferQueue) CreateIssuerQueue(issuerID iotago.AccountID) *IssuerQueue return issuerQueue } -func (b *BufferQueue) GetIssuerQueue(issuerID iotago.AccountID) (*IssuerQueue, error) { +func (b *BufferQueue) GetOrCreateIssuerQueue(issuerID iotago.AccountID) (*IssuerQueue, bool) { element, issuerActive := b.activeIssuers.Get(issuerID) if !issuerActive { - return nil, ierrors.New("issuer queue does not exist") + // create new issuer queue + return b.CreateIssuerQueue(issuerID), true } issuerQueue, isIQ := element.Value.(*IssuerQueue) if !isIQ { - return nil, ierrors.New("buffer contains elements that are not issuer queues") + panic("buffer contains elements that are not issuer queues") } - // issuer queue exists - return issuerQueue, nil + return issuerQueue, false +} + +// RemoveIssuerQueue removes all blocks (submitted and ready) for the given issuer and deletes the issuer queue. +func (b *BufferQueue) RemoveIssuerQueue(issuerID iotago.AccountID) { + element, ok := b.activeIssuers.Get(issuerID) + if !ok { + return + } + issuerQueue, isIQ := element.Value.(*IssuerQueue) + if !isIQ { + panic("buffer contains elements that are not issuer queues") + } + b.size -= issuerQueue.Size() + + b.ringRemove(element) + b.activeIssuers.Delete(issuerID) +} + +// RemoveIssuerQueueIfEmpty removes all blocks (submitted and ready) for the given issuer and deletes the issuer queue if it is empty. +func (b *BufferQueue) RemoveIssuerQueueIfEmpty(issuerID iotago.AccountID) { + element, ok := b.activeIssuers.Get(issuerID) + if !ok { + return + } + issuerQueue, isIQ := element.Value.(*IssuerQueue) + if !isIQ { + panic("buffer contains elements that are not issuer queues") + } + + if issuerQueue.Size() == 0 { + b.ringRemove(element) + b.activeIssuers.Delete(issuerID) + } } // Submit submits a block. Return blocks dropped from the scheduler to make room for the submitted block. // The submitted block can also be returned as dropped if the issuer does not have enough mana. func (b *BufferQueue) Submit(blk *blocks.Block, issuerQueue *IssuerQueue, quantumFunc func(iotago.AccountID) Deficit, maxBuffer int) ([]*blocks.Block, bool) { - // first we submit the block, and if it turns out that the issuer doesn't have enough bandwidth to submit, it will be removed by dropTail if !issuerQueue.Submit(blk) { return nil, false @@ -105,49 +153,6 @@ func (b *BufferQueue) Submit(blk *blocks.Block, issuerQueue *IssuerQueue, quantu return nil, true } -func (b *BufferQueue) dropTail(quantumFunc func(iotago.AccountID) Deficit, maxBuffer int) (droppedBlocks []*blocks.Block) { - // remove as many blocks as necessary to stay within max buffer size - for b.Size() > maxBuffer { - // find the longest mana-scaled queue - maxIssuerID := b.longestQueueIssuerID(quantumFunc) - if longestQueue := b.IssuerQueue(maxIssuerID); longestQueue != nil { - if tail := longestQueue.RemoveTail(); tail != nil { - b.size-- - droppedBlocks = append(droppedBlocks, tail) - } - } - } - - return droppedBlocks -} - -func (b *BufferQueue) longestQueueIssuerID(quantumFunc func(iotago.AccountID) Deficit) iotago.AccountID { - start := b.Current() - ringStart := b.ring - maxScale := math.Inf(-1) - var maxIssuerID iotago.AccountID - for q := start; ; { - if issuerQuantum := quantumFunc(q.IssuerID()); issuerQuantum > 0 { - if scale := float64(q.Work()) / float64(issuerQuantum); scale > maxScale { - maxScale = scale - maxIssuerID = q.IssuerID() - } - } else if q.Size() > 0 { - // if the issuer has no quantum, then this is the max queue size - maxIssuerID = q.IssuerID() - b.ring = ringStart - - break - } - q = b.Next() - if q == start { - break - } - } - - return maxIssuerID -} - // Unsubmit removes a block from the submitted blocks. // If that block is already marked as ready, Unsubmit has no effect. func (b *BufferQueue) Unsubmit(block *blocks.Block) bool { @@ -213,34 +218,6 @@ func (b *BufferQueue) TotalBlocksCount() (blocksCount int) { return } -// InsertIssuer creates a queue for the given issuer and adds it to the list of active issuers. -func (b *BufferQueue) InsertIssuer(issuerID iotago.AccountID) { - _, issuerActive := b.activeIssuers.Get(issuerID) - if issuerActive { - return - } - - issuerQueue := NewIssuerQueue(issuerID) - b.activeIssuers.Set(issuerID, b.ringInsert(issuerQueue)) -} - -// RemoveIssuer removes all blocks (submitted and ready) for the given issuer. -func (b *BufferQueue) RemoveIssuer(issuerID iotago.AccountID) { - element, ok := b.activeIssuers.Get(issuerID) - if !ok { - return - } - - issuerQueue, isIQ := element.Value.(*IssuerQueue) - if !isIQ { - return - } - b.size -= issuerQueue.Size() - - b.ringRemove(element) - b.activeIssuers.Delete(issuerID) -} - // Next returns the next IssuerQueue in round-robin order. func (b *BufferQueue) Next() *IssuerQueue { if b.ring != nil { @@ -301,6 +278,49 @@ func (b *BufferQueue) IssuerIDs() []iotago.AccountID { return issuerIDs } +func (b *BufferQueue) dropTail(quantumFunc func(iotago.AccountID) Deficit, maxBuffer int) (droppedBlocks []*blocks.Block) { + // remove as many blocks as necessary to stay within max buffer size + for b.Size() > maxBuffer { + // find the longest mana-scaled queue + maxIssuerID := b.longestQueueIssuerID(quantumFunc) + if longestQueue := b.IssuerQueue(maxIssuerID); longestQueue != nil { + if tail := longestQueue.RemoveTail(); tail != nil { + b.size-- + droppedBlocks = append(droppedBlocks, tail) + } + } + } + + return droppedBlocks +} + +func (b *BufferQueue) longestQueueIssuerID(quantumFunc func(iotago.AccountID) Deficit) iotago.AccountID { + start := b.Current() + ringStart := b.ring + maxScale := math.Inf(-1) + var maxIssuerID iotago.AccountID + for q := start; ; { + if issuerQuantum := quantumFunc(q.IssuerID()); issuerQuantum > 0 { + if scale := float64(q.Work()) / float64(issuerQuantum); scale > maxScale { + maxScale = scale + maxIssuerID = q.IssuerID() + } + } else if q.Size() > 0 { + // if the issuer has no quantum, then this is the max queue size + maxIssuerID = q.IssuerID() + b.ring = ringStart + + break + } + q = b.Next() + if q == start { + break + } + } + + return maxIssuerID +} + func (b *BufferQueue) ringRemove(r *ring.Ring) { n := b.ring.Next() if r == b.ring { diff --git a/pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go b/pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go index 8eb606cd3..31fa7ccb1 100644 --- a/pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go +++ b/pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go @@ -148,12 +148,12 @@ func (s *Scheduler) Start() { s.TriggerInitialized() } -// IssuerQueueSizeCount returns the number of blocks in the queue of the given issuer. +// IssuerQueueBlockCount returns the number of blocks in the queue of the given issuer. func (s *Scheduler) IssuerQueueBlockCount(issuerID iotago.AccountID) int { s.bufferMutex.RLock() defer s.bufferMutex.RUnlock() - return s.basicBuffer.IssuerQueue(issuerID).Size() + return s.basicBuffer.IssuerQueueBlockCount(issuerID) } // IssuerQueueWork returns the queue size of the given issuer in work units. @@ -161,7 +161,7 @@ func (s *Scheduler) IssuerQueueWork(issuerID iotago.AccountID) iotago.WorkScore s.bufferMutex.RLock() defer s.bufferMutex.RUnlock() - return s.basicBuffer.IssuerQueue(issuerID).Work() + return s.basicBuffer.IssuerQueueWork(issuerID) } // ValidatorQueueBlockCount returns the number of validation blocks in the validator queue of the given issuer. @@ -227,7 +227,7 @@ func (s *Scheduler) IsBlockIssuerReady(accountID iotago.AccountID, blocks ...*bl return false } - return deficit >= s.deficitFromWork(work+s.basicBuffer.IssuerQueue(accountID).Work()) + return deficit >= s.deficitFromWork(work+s.basicBuffer.IssuerQueueWork(accountID)) } func (s *Scheduler) AddBlock(block *blocks.Block) { @@ -245,17 +245,7 @@ func (s *Scheduler) enqueueBasicBlock(block *blocks.Block) { slot := s.latestCommittedSlot() issuerID := block.ProtocolBlock().IssuerID - issuerQueue, err := s.basicBuffer.GetIssuerQueue(issuerID) - if err != nil { - // this should only ever happen if the issuer has been removed due to insufficient Mana. - // if Mana is now sufficient again, we can add the issuer again. - _, quantumErr := s.quantumFunc(issuerID, slot) - if quantumErr != nil { - s.errorHandler(ierrors.Wrapf(quantumErr, "failed to retrieve quantum for issuerID %s in slot %d when adding a block", issuerID, slot)) - } - - issuerQueue = s.createIssuer(issuerID) - } + issuerQueue := s.getOrCreateIssuer(issuerID) droppedBlocks, submitted := s.basicBuffer.Submit( block, @@ -437,7 +427,7 @@ func (s *Scheduler) selectBasicBlockWithoutLocking() { // increment every issuer's deficit for the required number of rounds for q := start; ; { issuerID := q.IssuerID() - if err := s.incrementDeficit(issuerID, rounds, slot); err != nil { + if _, err := s.incrementDeficit(issuerID, rounds, slot); err != nil { s.errorHandler(ierrors.Wrapf(err, "failed to increment deficit for issuerID %s in slot %d", issuerID, slot)) s.removeIssuer(issuerID, err) @@ -456,20 +446,24 @@ func (s *Scheduler) selectBasicBlockWithoutLocking() { // increment the deficit for all issuers before schedulingIssuer one more time for q := start; q != schedulingIssuer; q = s.basicBuffer.Next() { issuerID := q.IssuerID() - if err := s.incrementDeficit(issuerID, 1, slot); err != nil { + newDeficit, err := s.incrementDeficit(issuerID, 1, slot) + if err != nil { s.errorHandler(ierrors.Wrapf(err, "failed to increment deficit for issuerID %s in slot %d", issuerID, slot)) s.removeIssuer(issuerID, err) return } + + // remove empty issuer queues of issuers with max deficit. + if newDeficit == s.maxDeficit() { + s.basicBuffer.RemoveIssuerQueueIfEmpty(issuerID) + } } // remove the block from the buffer and adjust issuer's deficit block := s.basicBuffer.PopFront() issuerID := block.ProtocolBlock().IssuerID - err := s.updateDeficit(issuerID, -s.deficitFromWork(block.WorkScore())) - - if err != nil { + if _, err := s.updateDeficit(issuerID, -s.deficitFromWork(block.WorkScore())); err != nil { // if something goes wrong with deficit update, drop the block instead of scheduling it. block.SetDropped() s.events.BlockDropped.Trigger(block, err) @@ -570,8 +564,16 @@ func (s *Scheduler) removeIssuer(issuerID iotago.AccountID, err error) { } s.deficits.Delete(issuerID) + s.basicBuffer.RemoveIssuerQueue(issuerID) +} - s.basicBuffer.RemoveIssuer(issuerID) +func (s *Scheduler) getOrCreateIssuer(accountID iotago.AccountID) *IssuerQueue { + issuerQueue, created := s.basicBuffer.GetOrCreateIssuerQueue(accountID) + if created { + s.deficits.Set(accountID, 0) + } + + return issuerQueue } func (s *Scheduler) createIssuer(accountID iotago.AccountID) *IssuerQueue { @@ -581,16 +583,16 @@ func (s *Scheduler) createIssuer(accountID iotago.AccountID) *IssuerQueue { return issuerQueue } -func (s *Scheduler) updateDeficit(accountID iotago.AccountID, delta Deficit) error { +func (s *Scheduler) updateDeficit(accountID iotago.AccountID, delta Deficit) (Deficit, error) { var updateErr error - s.deficits.Compute(accountID, func(currentValue Deficit, exists bool) Deficit { + updatedDeficit := s.deficits.Compute(accountID, func(currentValue Deficit, exists bool) Deficit { if !exists { updateErr = ierrors.Errorf("could not get deficit for issuer %s", accountID) return 0 } newDeficit, err := safemath.SafeAdd(currentValue, delta) - if err != nil { - // It can only overflow. We never allow the value to go below 0, so underflow is impossible. + // It can only overflow. We never allow the value to go below 0, so underflow is impossible. + if err != nil || newDeficit >= s.maxDeficit() { return s.maxDeficit() } @@ -600,22 +602,22 @@ func (s *Scheduler) updateDeficit(accountID iotago.AccountID, delta Deficit) err return 0 } - return lo.Min(newDeficit, s.maxDeficit()) + return newDeficit }) if updateErr != nil { s.removeIssuer(accountID, updateErr) - return updateErr + return 0, updateErr } - return nil + return updatedDeficit, nil } -func (s *Scheduler) incrementDeficit(issuerID iotago.AccountID, rounds Deficit, slot iotago.SlotIndex) error { +func (s *Scheduler) incrementDeficit(issuerID iotago.AccountID, rounds Deficit, slot iotago.SlotIndex) (Deficit, error) { quantum, err := s.quantumFunc(issuerID, slot) if err != nil { - return err + return 0, err } delta, err := safemath.SafeMul(quantum, rounds) diff --git a/pkg/tests/booker_test.go b/pkg/tests/booker_test.go index ea4234537..e667664e3 100644 --- a/pkg/tests/booker_test.go +++ b/pkg/tests/booker_test.go @@ -125,7 +125,7 @@ func Test_MultipleAttachments(t *testing.T) { nodeA := ts.AddValidatorNode("nodeA") nodeB := ts.AddValidatorNode("nodeB") blockIssuerA := ts.AddBasicBlockIssuer("blockIssuerA") - blockIssuerB := ts.AddBasicBlockIssuer("blockIssuerA") + blockIssuerB := ts.AddBasicBlockIssuer("blockIssuerB") ts.Run(true, map[string][]options.Option[protocol.Protocol]{}) From 2a61a1db7dba4917b83e5afa89d1439206e754de Mon Sep 17 00:00:00 2001 From: Andrew Date: Thu, 26 Oct 2023 11:49:49 +0100 Subject: [PATCH 3/3] fix bug with deficit initialisation for removed queues --- .../engine/congestioncontrol/scheduler/drr/drrbuffer.go | 6 +++--- .../engine/congestioncontrol/scheduler/drr/scheduler.go | 6 ++---- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/pkg/protocol/engine/congestioncontrol/scheduler/drr/drrbuffer.go b/pkg/protocol/engine/congestioncontrol/scheduler/drr/drrbuffer.go index 99bcc27f3..3ae38379a 100644 --- a/pkg/protocol/engine/congestioncontrol/scheduler/drr/drrbuffer.go +++ b/pkg/protocol/engine/congestioncontrol/scheduler/drr/drrbuffer.go @@ -88,18 +88,18 @@ func (b *BufferQueue) CreateIssuerQueue(issuerID iotago.AccountID) *IssuerQueue return issuerQueue } -func (b *BufferQueue) GetOrCreateIssuerQueue(issuerID iotago.AccountID) (*IssuerQueue, bool) { +func (b *BufferQueue) GetOrCreateIssuerQueue(issuerID iotago.AccountID) *IssuerQueue { element, issuerActive := b.activeIssuers.Get(issuerID) if !issuerActive { // create new issuer queue - return b.CreateIssuerQueue(issuerID), true + return b.CreateIssuerQueue(issuerID) } issuerQueue, isIQ := element.Value.(*IssuerQueue) if !isIQ { panic("buffer contains elements that are not issuer queues") } - return issuerQueue, false + return issuerQueue } // RemoveIssuerQueue removes all blocks (submitted and ready) for the given issuer and deletes the issuer queue. diff --git a/pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go b/pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go index 31fa7ccb1..b6140cc32 100644 --- a/pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go +++ b/pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go @@ -568,10 +568,8 @@ func (s *Scheduler) removeIssuer(issuerID iotago.AccountID, err error) { } func (s *Scheduler) getOrCreateIssuer(accountID iotago.AccountID) *IssuerQueue { - issuerQueue, created := s.basicBuffer.GetOrCreateIssuerQueue(accountID) - if created { - s.deficits.Set(accountID, 0) - } + issuerQueue := s.basicBuffer.GetOrCreateIssuerQueue(accountID) + s.deficits.GetOrCreate(accountID, func() Deficit { return 0 }) return issuerQueue }