diff --git a/.golangci.yml b/.golangci.yml index a05e86b39..b17b71756 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -92,7 +92,7 @@ linters: - importas - inamedparam #- interfacebloat - - intrange + #- intrange # TODO: re-enable after https://github.com/ckaznocha/intrange v0.1.2 release is merged in golangci-lint #- ireturn #- lll - loggercheck diff --git a/pkg/protocol/engine/congestioncontrol/scheduler/drr/basicbuffer.go b/pkg/protocol/engine/congestioncontrol/scheduler/drr/basicbuffer.go index c284fff4e..fabd1e72c 100644 --- a/pkg/protocol/engine/congestioncontrol/scheduler/drr/basicbuffer.go +++ b/pkg/protocol/engine/congestioncontrol/scheduler/drr/basicbuffer.go @@ -64,13 +64,13 @@ func (b *BasicBuffer) Size() int { // IssuerQueue returns the queue for the corresponding issuer. func (b *BasicBuffer) IssuerQueue(issuerID iotago.AccountID) *IssuerQueue { - element, ok := b.activeIssuers.Get(issuerID) - if !ok { + element, exists := b.activeIssuers.Get(issuerID) + if !exists { return nil } issuerQueue, isIQ := element.Value.(*IssuerQueue) if !isIQ { - return nil + panic("buffer contains elements that are not issuer queues") } return issuerQueue @@ -119,8 +119,8 @@ func (b *BasicBuffer) GetOrCreateIssuerQueue(issuerID iotago.AccountID) *IssuerQ // RemoveIssuerQueue removes all blocks (submitted and ready) for the given issuer and deletes the issuer queue. func (b *BasicBuffer) RemoveIssuerQueue(issuerID iotago.AccountID) { - element, ok := b.activeIssuers.Get(issuerID) - if !ok { + element, exists := b.activeIssuers.Get(issuerID) + if !exists { return } issuerQueue, isIQ := element.Value.(*IssuerQueue) @@ -135,8 +135,8 @@ func (b *BasicBuffer) RemoveIssuerQueue(issuerID iotago.AccountID) { // RemoveIssuerQueueIfEmpty removes all blocks (submitted and ready) for the given issuer and deletes the issuer queue if it is empty. func (b *BasicBuffer) RemoveIssuerQueueIfEmpty(issuerID iotago.AccountID) { - element, ok := b.activeIssuers.Get(issuerID) - if !ok { + element, exists := b.activeIssuers.Get(issuerID) + if !exists { return } issuerQueue, isIQ := element.Value.(*IssuerQueue) @@ -168,25 +168,6 @@ func (b *BasicBuffer) Submit(blk *blocks.Block, issuerQueue *IssuerQueue, quantu return nil, true } -// Unsubmit removes a block from the submitted blocks. -// If that block is already marked as ready, Unsubmit has no effect. -func (b *BasicBuffer) Unsubmit(block *blocks.Block) bool { - issuerID := block.IssuerID() - - issuerQueue := b.IssuerQueue(issuerID) - if issuerQueue == nil { - return false - } - - if !issuerQueue.Unsubmit(block) { - return false - } - - b.size.Dec() - - return true -} - // Ready marks a previously submitted block as ready to be scheduled. func (b *BasicBuffer) Ready(block *blocks.Block) bool { issuerQueue := b.IssuerQueue(block.IssuerID()) @@ -205,7 +186,7 @@ func (b *BasicBuffer) ReadyBlocksCount() (readyBlocksCount int) { } for q := start; ; { - readyBlocksCount += q.inbox.Len() + readyBlocksCount += q.readyHeap.Len() q = b.Next() if q == start { break @@ -222,8 +203,8 @@ func (b *BasicBuffer) TotalBlocksCount() (blocksCount int) { return } for q := start; ; { - blocksCount += q.inbox.Len() - blocksCount += q.submitted.Size() + blocksCount += q.readyHeap.Len() + blocksCount += q.nonReadyMap.Size() q = b.Next() if q == start { break @@ -296,23 +277,31 @@ func (b *BasicBuffer) dropTail(quantumFunc func(iotago.AccountID) Deficit, maxBu // remove as many blocks as necessary to stay within max buffer size for b.Size() > maxBuffer { // find the longest mana-scaled queue - maxIssuerID := b.longestQueueIssuerID(quantumFunc) - if longestQueue := b.IssuerQueue(maxIssuerID); longestQueue != nil { - if tail := longestQueue.RemoveTail(); tail != nil { - b.size.Dec() - droppedBlocks = append(droppedBlocks, tail) - } + maxIssuerID := b.mustLongestQueueIssuerID(quantumFunc) + longestQueue := b.IssuerQueue(maxIssuerID) + if longestQueue == nil { + panic("buffer is full, but longest queue does not exist") } + + tail := longestQueue.RemoveTail() + if tail == nil { + panic("buffer is full, but tail of longest queue does not exist") + } + + b.size.Dec() + droppedBlocks = append(droppedBlocks, tail) } return droppedBlocks } -func (b *BasicBuffer) longestQueueIssuerID(quantumFunc func(iotago.AccountID) Deficit) iotago.AccountID { +// mustLongestQueueIssuerID returns the issuerID of the longest queue in the buffer. +// This function panics if no longest queue is found. +func (b *BasicBuffer) mustLongestQueueIssuerID(quantumFunc func(iotago.AccountID) Deficit) iotago.AccountID { start := b.Current() ringStart := b.ring maxScale := math.Inf(-1) - var maxIssuerID iotago.AccountID + maxIssuerID := iotago.EmptyAccountID for q := start; ; { if issuerQuantum := quantumFunc(q.IssuerID()); issuerQuantum > 0 { if scale := float64(q.Work()) / float64(issuerQuantum); scale > maxScale { @@ -332,6 +321,10 @@ func (b *BasicBuffer) longestQueueIssuerID(quantumFunc func(iotago.AccountID) De } } + if maxIssuerID == iotago.EmptyAccountID { + panic("no longest queue determined") + } + return maxIssuerID } diff --git a/pkg/protocol/engine/congestioncontrol/scheduler/drr/issuerqueue.go b/pkg/protocol/engine/congestioncontrol/scheduler/drr/issuerqueue.go index 1a04772ce..68113309b 100644 --- a/pkg/protocol/engine/congestioncontrol/scheduler/drr/issuerqueue.go +++ b/pkg/protocol/engine/congestioncontrol/scheduler/drr/issuerqueue.go @@ -18,18 +18,18 @@ import ( // IssuerQueue keeps the submitted blocks of an issuer. type IssuerQueue struct { - issuerID iotago.AccountID - submitted *shrinkingmap.ShrinkingMap[iotago.BlockID, *blocks.Block] - inbox generalheap.Heap[timed.HeapKey, *blocks.Block] - size atomic.Int64 - work atomic.Int64 + issuerID iotago.AccountID + nonReadyMap *shrinkingmap.ShrinkingMap[iotago.BlockID, *blocks.Block] + readyHeap generalheap.Heap[timed.HeapKey, *blocks.Block] + size atomic.Int64 + work atomic.Int64 } // NewIssuerQueue returns a new IssuerQueue. func NewIssuerQueue(issuerID iotago.AccountID) *IssuerQueue { return &IssuerQueue{ - issuerID: issuerID, - submitted: shrinkingmap.New[iotago.BlockID, *blocks.Block](), + issuerID: issuerID, + nonReadyMap: shrinkingmap.New[iotago.BlockID, *blocks.Block](), } } @@ -58,18 +58,18 @@ func (q *IssuerQueue) IssuerID() iotago.AccountID { return q.issuerID } -// Submit submits a block for the queue. +// Submit submits a block for the queue. Return true if submitted and false if it was already submitted. func (q *IssuerQueue) Submit(element *blocks.Block) bool { - // this is just a debugging check, it will never happen in practice + // this is just a debugging check, it should never happen in practice and we should panic if it does. if blkIssuerID := element.IssuerID(); q.issuerID != blkIssuerID { panic(fmt.Sprintf("issuerqueue: queue issuer ID(%x) and issuer ID(%x) does not match.", q.issuerID, blkIssuerID)) } - if _, submitted := q.submitted.Get(element.ID()); submitted { + if _, submitted := q.nonReadyMap.Get(element.ID()); submitted { return false } - q.submitted.Set(element.ID(), element) + q.nonReadyMap.Set(element.ID(), element) q.size.Inc() q.work.Add(int64(element.WorkScore())) @@ -78,11 +78,11 @@ func (q *IssuerQueue) Submit(element *blocks.Block) bool { // Unsubmit removes a previously submitted block from the queue. func (q *IssuerQueue) Unsubmit(block *blocks.Block) bool { - if _, submitted := q.submitted.Get(block.ID()); !submitted { + if _, submitted := q.nonReadyMap.Get(block.ID()); !submitted { return false } - q.submitted.Delete(block.ID()) + q.nonReadyMap.Delete(block.ID()) q.size.Dec() q.work.Sub(int64(block.WorkScore())) @@ -91,21 +91,21 @@ func (q *IssuerQueue) Unsubmit(block *blocks.Block) bool { // Ready marks a previously submitted block as ready to be scheduled. func (q *IssuerQueue) Ready(block *blocks.Block) bool { - if _, submitted := q.submitted.Get(block.ID()); !submitted { + if _, submitted := q.nonReadyMap.Get(block.ID()); !submitted { return false } - q.submitted.Delete(block.ID()) - heap.Push(&q.inbox, &generalheap.HeapElement[timed.HeapKey, *blocks.Block]{Value: block, Key: timed.HeapKey(block.IssuingTime())}) + q.nonReadyMap.Delete(block.ID()) + heap.Push(&q.readyHeap, &generalheap.HeapElement[timed.HeapKey, *blocks.Block]{Value: block, Key: timed.HeapKey(block.IssuingTime())}) return true } // IDs returns the IDs of all submitted blocks (ready or not). func (q *IssuerQueue) IDs() (ids []iotago.BlockID) { - ids = q.submitted.Keys() + ids = q.nonReadyMap.Keys() - for _, block := range q.inbox { + for _, block := range q.readyHeap { ids = append(ids, block.Value.ID()) } @@ -114,22 +114,22 @@ func (q *IssuerQueue) IDs() (ids []iotago.BlockID) { // Front returns the first ready block in the queue. func (q *IssuerQueue) Front() *blocks.Block { - if q == nil || q.inbox.Len() == 0 { + if q == nil || q.readyHeap.Len() == 0 { return nil } - return q.inbox[0].Value + return q.readyHeap[0].Value } // PopFront removes the first ready block from the queue. func (q *IssuerQueue) PopFront() *blocks.Block { - if q.inbox.Len() == 0 { + if q.readyHeap.Len() == 0 { return nil } - heapElement, isHeapElement := heap.Pop(&q.inbox).(*generalheap.HeapElement[timed.HeapKey, *blocks.Block]) + heapElement, isHeapElement := heap.Pop(&q.readyHeap).(*generalheap.HeapElement[timed.HeapKey, *blocks.Block]) if !isHeapElement { - return nil + panic("unable to pop from a non-empty heap.") } blk := heapElement.Value q.size.Dec() @@ -138,31 +138,32 @@ func (q *IssuerQueue) PopFront() *blocks.Block { return blk } +// RemoveTail removes the oldest block from the queue. func (q *IssuerQueue) RemoveTail() *blocks.Block { - var oldestSubmittedBlock *blocks.Block - q.submitted.ForEach(func(_ iotago.BlockID, block *blocks.Block) bool { - if oldestSubmittedBlock == nil || oldestSubmittedBlock.IssuingTime().After(block.IssuingTime()) { - oldestSubmittedBlock = block + var oldestNonReadyBlock *blocks.Block + q.nonReadyMap.ForEach(func(_ iotago.BlockID, block *blocks.Block) bool { + if oldestNonReadyBlock == nil || oldestNonReadyBlock.IssuingTime().After(block.IssuingTime()) { + oldestNonReadyBlock = block } return true }) - tail := q.tail() - // if heap tail does not exist or tail is newer than oldest submitted block, unsubmit oldest block - if oldestSubmittedBlock != nil && (tail < 0 || q.inbox[tail].Key.CompareTo(timed.HeapKey(oldestSubmittedBlock.IssuingTime())) > 0) { - q.Unsubmit(oldestSubmittedBlock) - - return oldestSubmittedBlock - } else if tail < 0 { - // should never happen that the oldest submitted block does not exist and the tail does not exist. - return nil + heapTailIndex := q.heapTail() + // if heap tail (oldest ready block) does not exist or is newer than oldest non-ready block, unsubmit the oldest non-ready block + if oldestNonReadyBlock != nil && (heapTailIndex < 0 || q.readyHeap[heapTailIndex].Key.CompareTo(timed.HeapKey(oldestNonReadyBlock.IssuingTime())) > 0) { + if q.Unsubmit(oldestNonReadyBlock) { + return oldestNonReadyBlock + } + } else if heapTailIndex < 0 { // the heap is empty + // should never happen that the oldest submitted block does not exist and the heap is empty. + panic("heap tail and oldest submitted block do not exist. Trying to remove tail of an empty queue?") } - // if the tail exists and is older than the oldest submitted block, drop it - heapElement, isHeapElement := heap.Remove(&q.inbox, tail).(*generalheap.HeapElement[timed.HeapKey, *blocks.Block]) + // if the oldest ready block is older than the oldest non-ready block, drop it + heapElement, isHeapElement := heap.Remove(&q.readyHeap, heapTailIndex).(*generalheap.HeapElement[timed.HeapKey, *blocks.Block]) if !isHeapElement { - return nil + panic("trying to remove a heap element that does not exist.") } blk := heapElement.Value q.size.Dec() @@ -171,8 +172,8 @@ func (q *IssuerQueue) RemoveTail() *blocks.Block { return blk } -func (q *IssuerQueue) tail() int { - h := q.inbox +func (q *IssuerQueue) heapTail() int { + h := q.readyHeap if h.Len() <= 0 { return -1 } diff --git a/pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go b/pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go index 3c20e4eca..4239d831a 100644 --- a/pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go +++ b/pkg/protocol/engine/congestioncontrol/scheduler/drr/scheduler.go @@ -580,15 +580,15 @@ func (s *Scheduler) selectIssuer(start *IssuerQueue, slot iotago.SlotIndex) (Def func (s *Scheduler) removeIssuer(issuerID iotago.AccountID, err error) { q := s.basicBuffer.IssuerQueue(issuerID) - q.submitted.ForEach(func(_ iotago.BlockID, block *blocks.Block) bool { + q.nonReadyMap.ForEach(func(_ iotago.BlockID, block *blocks.Block) bool { block.SetDropped() s.events.BlockDropped.Trigger(block, err) return true }) - for q.inbox.Len() > 0 { - block := q.PopFront() + for i := 0; i < q.readyHeap.Len(); i++ { + block := q.readyHeap[i].Value block.SetDropped() s.events.BlockDropped.Trigger(block, err) }