Skip to content

Commit

Permalink
Merge pull request #946 from iotaledger/fix/scheduler-bug
Browse files Browse the repository at this point in the history
Fix scheduler bug and add panics
  • Loading branch information
muXxer authored Apr 29, 2024
2 parents 49f7f6d + 809012e commit 04a114d
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 82 deletions.
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ linters:
- importas
- inamedparam
#- interfacebloat
- intrange
#- intrange # TODO: re-enable after https://github.com/ckaznocha/intrange v0.1.2 release is merged in golangci-lint
#- ireturn
#- lll
- loggercheck
Expand Down
67 changes: 30 additions & 37 deletions pkg/protocol/engine/congestioncontrol/scheduler/drr/basicbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,13 @@ func (b *BasicBuffer) Size() int {

// IssuerQueue returns the queue for the corresponding issuer.
func (b *BasicBuffer) IssuerQueue(issuerID iotago.AccountID) *IssuerQueue {
element, ok := b.activeIssuers.Get(issuerID)
if !ok {
element, exists := b.activeIssuers.Get(issuerID)
if !exists {
return nil
}
issuerQueue, isIQ := element.Value.(*IssuerQueue)
if !isIQ {
return nil
panic("buffer contains elements that are not issuer queues")
}

return issuerQueue
Expand Down Expand Up @@ -119,8 +119,8 @@ func (b *BasicBuffer) GetOrCreateIssuerQueue(issuerID iotago.AccountID) *IssuerQ

// RemoveIssuerQueue removes all blocks (submitted and ready) for the given issuer and deletes the issuer queue.
func (b *BasicBuffer) RemoveIssuerQueue(issuerID iotago.AccountID) {
element, ok := b.activeIssuers.Get(issuerID)
if !ok {
element, exists := b.activeIssuers.Get(issuerID)
if !exists {
return
}
issuerQueue, isIQ := element.Value.(*IssuerQueue)
Expand All @@ -135,8 +135,8 @@ func (b *BasicBuffer) RemoveIssuerQueue(issuerID iotago.AccountID) {

// RemoveIssuerQueueIfEmpty removes all blocks (submitted and ready) for the given issuer and deletes the issuer queue if it is empty.
func (b *BasicBuffer) RemoveIssuerQueueIfEmpty(issuerID iotago.AccountID) {
element, ok := b.activeIssuers.Get(issuerID)
if !ok {
element, exists := b.activeIssuers.Get(issuerID)
if !exists {
return
}
issuerQueue, isIQ := element.Value.(*IssuerQueue)
Expand Down Expand Up @@ -168,25 +168,6 @@ func (b *BasicBuffer) Submit(blk *blocks.Block, issuerQueue *IssuerQueue, quantu
return nil, true
}

// Unsubmit removes a block from the submitted blocks.
// If that block is already marked as ready, Unsubmit has no effect.
func (b *BasicBuffer) Unsubmit(block *blocks.Block) bool {
issuerID := block.IssuerID()

issuerQueue := b.IssuerQueue(issuerID)
if issuerQueue == nil {
return false
}

if !issuerQueue.Unsubmit(block) {
return false
}

b.size.Dec()

return true
}

// Ready marks a previously submitted block as ready to be scheduled.
func (b *BasicBuffer) Ready(block *blocks.Block) bool {
issuerQueue := b.IssuerQueue(block.IssuerID())
Expand All @@ -205,7 +186,7 @@ func (b *BasicBuffer) ReadyBlocksCount() (readyBlocksCount int) {
}

for q := start; ; {
readyBlocksCount += q.inbox.Len()
readyBlocksCount += q.readyHeap.Len()
q = b.Next()
if q == start {
break
Expand All @@ -222,8 +203,8 @@ func (b *BasicBuffer) TotalBlocksCount() (blocksCount int) {
return
}
for q := start; ; {
blocksCount += q.inbox.Len()
blocksCount += q.submitted.Size()
blocksCount += q.readyHeap.Len()
blocksCount += q.nonReadyMap.Size()
q = b.Next()
if q == start {
break
Expand Down Expand Up @@ -296,23 +277,31 @@ func (b *BasicBuffer) dropTail(quantumFunc func(iotago.AccountID) Deficit, maxBu
// remove as many blocks as necessary to stay within max buffer size
for b.Size() > maxBuffer {
// find the longest mana-scaled queue
maxIssuerID := b.longestQueueIssuerID(quantumFunc)
if longestQueue := b.IssuerQueue(maxIssuerID); longestQueue != nil {
if tail := longestQueue.RemoveTail(); tail != nil {
b.size.Dec()
droppedBlocks = append(droppedBlocks, tail)
}
maxIssuerID := b.mustLongestQueueIssuerID(quantumFunc)
longestQueue := b.IssuerQueue(maxIssuerID)
if longestQueue == nil {
panic("buffer is full, but longest queue does not exist")
}

tail := longestQueue.RemoveTail()
if tail == nil {
panic("buffer is full, but tail of longest queue does not exist")
}

b.size.Dec()
droppedBlocks = append(droppedBlocks, tail)
}

return droppedBlocks
}

func (b *BasicBuffer) longestQueueIssuerID(quantumFunc func(iotago.AccountID) Deficit) iotago.AccountID {
// mustLongestQueueIssuerID returns the issuerID of the longest queue in the buffer.
// This function panics if no longest queue is found.
func (b *BasicBuffer) mustLongestQueueIssuerID(quantumFunc func(iotago.AccountID) Deficit) iotago.AccountID {
start := b.Current()
ringStart := b.ring
maxScale := math.Inf(-1)
var maxIssuerID iotago.AccountID
maxIssuerID := iotago.EmptyAccountID
for q := start; ; {
if issuerQuantum := quantumFunc(q.IssuerID()); issuerQuantum > 0 {
if scale := float64(q.Work()) / float64(issuerQuantum); scale > maxScale {
Expand All @@ -332,6 +321,10 @@ func (b *BasicBuffer) longestQueueIssuerID(quantumFunc func(iotago.AccountID) De
}
}

if maxIssuerID == iotago.EmptyAccountID {
panic("no longest queue determined")
}

return maxIssuerID
}

Expand Down
83 changes: 42 additions & 41 deletions pkg/protocol/engine/congestioncontrol/scheduler/drr/issuerqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@ import (

// IssuerQueue keeps the submitted blocks of an issuer.
type IssuerQueue struct {
issuerID iotago.AccountID
submitted *shrinkingmap.ShrinkingMap[iotago.BlockID, *blocks.Block]
inbox generalheap.Heap[timed.HeapKey, *blocks.Block]
size atomic.Int64
work atomic.Int64
issuerID iotago.AccountID
nonReadyMap *shrinkingmap.ShrinkingMap[iotago.BlockID, *blocks.Block]
readyHeap generalheap.Heap[timed.HeapKey, *blocks.Block]
size atomic.Int64
work atomic.Int64
}

// NewIssuerQueue returns a new IssuerQueue.
func NewIssuerQueue(issuerID iotago.AccountID) *IssuerQueue {
return &IssuerQueue{
issuerID: issuerID,
submitted: shrinkingmap.New[iotago.BlockID, *blocks.Block](),
issuerID: issuerID,
nonReadyMap: shrinkingmap.New[iotago.BlockID, *blocks.Block](),
}
}

Expand Down Expand Up @@ -58,18 +58,18 @@ func (q *IssuerQueue) IssuerID() iotago.AccountID {
return q.issuerID
}

// Submit submits a block for the queue.
// Submit submits a block for the queue. Return true if submitted and false if it was already submitted.
func (q *IssuerQueue) Submit(element *blocks.Block) bool {
// this is just a debugging check, it will never happen in practice
// this is just a debugging check, it should never happen in practice and we should panic if it does.
if blkIssuerID := element.IssuerID(); q.issuerID != blkIssuerID {
panic(fmt.Sprintf("issuerqueue: queue issuer ID(%x) and issuer ID(%x) does not match.", q.issuerID, blkIssuerID))
}

if _, submitted := q.submitted.Get(element.ID()); submitted {
if _, submitted := q.nonReadyMap.Get(element.ID()); submitted {
return false
}

q.submitted.Set(element.ID(), element)
q.nonReadyMap.Set(element.ID(), element)
q.size.Inc()
q.work.Add(int64(element.WorkScore()))

Expand All @@ -78,11 +78,11 @@ func (q *IssuerQueue) Submit(element *blocks.Block) bool {

// Unsubmit removes a previously submitted block from the queue.
func (q *IssuerQueue) Unsubmit(block *blocks.Block) bool {
if _, submitted := q.submitted.Get(block.ID()); !submitted {
if _, submitted := q.nonReadyMap.Get(block.ID()); !submitted {
return false
}

q.submitted.Delete(block.ID())
q.nonReadyMap.Delete(block.ID())
q.size.Dec()
q.work.Sub(int64(block.WorkScore()))

Expand All @@ -91,21 +91,21 @@ func (q *IssuerQueue) Unsubmit(block *blocks.Block) bool {

// Ready marks a previously submitted block as ready to be scheduled.
func (q *IssuerQueue) Ready(block *blocks.Block) bool {
if _, submitted := q.submitted.Get(block.ID()); !submitted {
if _, submitted := q.nonReadyMap.Get(block.ID()); !submitted {
return false
}

q.submitted.Delete(block.ID())
heap.Push(&q.inbox, &generalheap.HeapElement[timed.HeapKey, *blocks.Block]{Value: block, Key: timed.HeapKey(block.IssuingTime())})
q.nonReadyMap.Delete(block.ID())
heap.Push(&q.readyHeap, &generalheap.HeapElement[timed.HeapKey, *blocks.Block]{Value: block, Key: timed.HeapKey(block.IssuingTime())})

return true
}

// IDs returns the IDs of all submitted blocks (ready or not).
func (q *IssuerQueue) IDs() (ids []iotago.BlockID) {
ids = q.submitted.Keys()
ids = q.nonReadyMap.Keys()

for _, block := range q.inbox {
for _, block := range q.readyHeap {
ids = append(ids, block.Value.ID())
}

Expand All @@ -114,22 +114,22 @@ func (q *IssuerQueue) IDs() (ids []iotago.BlockID) {

// Front returns the first ready block in the queue.
func (q *IssuerQueue) Front() *blocks.Block {
if q == nil || q.inbox.Len() == 0 {
if q == nil || q.readyHeap.Len() == 0 {
return nil
}

return q.inbox[0].Value
return q.readyHeap[0].Value
}

// PopFront removes the first ready block from the queue.
func (q *IssuerQueue) PopFront() *blocks.Block {
if q.inbox.Len() == 0 {
if q.readyHeap.Len() == 0 {
return nil
}

heapElement, isHeapElement := heap.Pop(&q.inbox).(*generalheap.HeapElement[timed.HeapKey, *blocks.Block])
heapElement, isHeapElement := heap.Pop(&q.readyHeap).(*generalheap.HeapElement[timed.HeapKey, *blocks.Block])
if !isHeapElement {
return nil
panic("unable to pop from a non-empty heap.")
}
blk := heapElement.Value
q.size.Dec()
Expand All @@ -138,31 +138,32 @@ func (q *IssuerQueue) PopFront() *blocks.Block {
return blk
}

// RemoveTail removes the oldest block from the queue.
func (q *IssuerQueue) RemoveTail() *blocks.Block {
var oldestSubmittedBlock *blocks.Block
q.submitted.ForEach(func(_ iotago.BlockID, block *blocks.Block) bool {
if oldestSubmittedBlock == nil || oldestSubmittedBlock.IssuingTime().After(block.IssuingTime()) {
oldestSubmittedBlock = block
var oldestNonReadyBlock *blocks.Block
q.nonReadyMap.ForEach(func(_ iotago.BlockID, block *blocks.Block) bool {
if oldestNonReadyBlock == nil || oldestNonReadyBlock.IssuingTime().After(block.IssuingTime()) {
oldestNonReadyBlock = block
}

return true
})

tail := q.tail()
// if heap tail does not exist or tail is newer than oldest submitted block, unsubmit oldest block
if oldestSubmittedBlock != nil && (tail < 0 || q.inbox[tail].Key.CompareTo(timed.HeapKey(oldestSubmittedBlock.IssuingTime())) > 0) {
q.Unsubmit(oldestSubmittedBlock)

return oldestSubmittedBlock
} else if tail < 0 {
// should never happen that the oldest submitted block does not exist and the tail does not exist.
return nil
heapTailIndex := q.heapTail()
// if heap tail (oldest ready block) does not exist or is newer than oldest non-ready block, unsubmit the oldest non-ready block
if oldestNonReadyBlock != nil && (heapTailIndex < 0 || q.readyHeap[heapTailIndex].Key.CompareTo(timed.HeapKey(oldestNonReadyBlock.IssuingTime())) > 0) {
if q.Unsubmit(oldestNonReadyBlock) {
return oldestNonReadyBlock
}
} else if heapTailIndex < 0 { // the heap is empty
// should never happen that the oldest submitted block does not exist and the heap is empty.
panic("heap tail and oldest submitted block do not exist. Trying to remove tail of an empty queue?")
}

// if the tail exists and is older than the oldest submitted block, drop it
heapElement, isHeapElement := heap.Remove(&q.inbox, tail).(*generalheap.HeapElement[timed.HeapKey, *blocks.Block])
// if the oldest ready block is older than the oldest non-ready block, drop it
heapElement, isHeapElement := heap.Remove(&q.readyHeap, heapTailIndex).(*generalheap.HeapElement[timed.HeapKey, *blocks.Block])
if !isHeapElement {
return nil
panic("trying to remove a heap element that does not exist.")
}
blk := heapElement.Value
q.size.Dec()
Expand All @@ -171,8 +172,8 @@ func (q *IssuerQueue) RemoveTail() *blocks.Block {
return blk
}

func (q *IssuerQueue) tail() int {
h := q.inbox
func (q *IssuerQueue) heapTail() int {
h := q.readyHeap
if h.Len() <= 0 {
return -1
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -580,15 +580,15 @@ func (s *Scheduler) selectIssuer(start *IssuerQueue, slot iotago.SlotIndex) (Def

func (s *Scheduler) removeIssuer(issuerID iotago.AccountID, err error) {
q := s.basicBuffer.IssuerQueue(issuerID)
q.submitted.ForEach(func(_ iotago.BlockID, block *blocks.Block) bool {
q.nonReadyMap.ForEach(func(_ iotago.BlockID, block *blocks.Block) bool {
block.SetDropped()
s.events.BlockDropped.Trigger(block, err)

return true
})

for q.inbox.Len() > 0 {
block := q.PopFront()
for i := 0; i < q.readyHeap.Len(); i++ {
block := q.readyHeap[i].Value
block.SetDropped()
s.events.BlockDropped.Trigger(block, err)
}
Expand Down

0 comments on commit 04a114d

Please sign in to comment.